diff --git a/hugegraph-store/hg-store-rocksdb/pom.xml b/hugegraph-store/hg-store-rocksdb/pom.xml new file mode 100644 index 0000000000..73c07b1c99 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/pom.xml @@ -0,0 +1,79 @@ + + + + + 4.0.0 + + + org.apache.hugegraph + hugegraph-store + ${revision} + ../pom.xml + + + hg-store-rocksdb + + + + org.apache.hugegraph + hugegraph-common + 1.3.0 + + + org.glassfish.jersey.inject + * + + + org.glassfish.jersey.media + * + + + org.glassfish.jersey.connectors + * + + + + + org.apache.hugegraph + hg-store-common + + + org.rocksdb + rocksdbjni + 7.7.3 + + + org.projectlombok + lombok + 1.18.22 + provided + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.15.0 + + + com.alibaba + fastjson + 1.2.83 + + + diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBSnapshotMeta.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBSnapshotMeta.java new file mode 100644 index 0000000000..e45a7eff54 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBSnapshotMeta.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.util.Date; +import java.util.HashMap; + +import lombok.Data; + +/** + * metadata of a Rocksdb snapshot + */ +@Data +public class DBSnapshotMeta { + + /** + * graph name + */ + private String graphName; + /** + * partition id + */ + private int partitionId; + /** + * star key with base64 encoding + */ + private byte[] startKey; + /** + * end key with base64 encoding + */ + private byte[] endKey; + /** + * created time of the snapshot + */ + private Date createdDate; + /** + * sst files, key is column family name value is the sst file name of the column + * family + */ + private HashMap sstFiles; +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBStoreException.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBStoreException.java new file mode 100644 index 0000000000..593a850fd1 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/DBStoreException.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.io.InterruptedIOException; + +public class DBStoreException extends RuntimeException { + + private static final long serialVersionUID = 5956983547131986887L; + + public DBStoreException(String message) { + super(message); + } + + public DBStoreException(String message, Throwable cause) { + super(message, cause); + } + + public DBStoreException(String message, Object... args) { + super(String.format(message, args)); + } + + public DBStoreException(String message, Throwable cause, Object... args) { + super(String.format(message, args), cause); + } + + public DBStoreException(Throwable cause) { + this("Exception in DBStore " + cause.getMessage(), cause); + } + + public static Throwable rootCause(Throwable e) { + Throwable cause = e; + while (cause.getCause() != null) { + cause = cause.getCause(); + } + return cause; + } + + public static boolean isInterrupted(Throwable e) { + Throwable rootCause = DBStoreException.rootCause(e); + return rootCause instanceof InterruptedException || + rootCause instanceof InterruptedIOException; + } + + public Throwable rootCause() { + return rootCause(this); + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBFactory.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBFactory.java new file mode 100644 index 0000000000..2ec56fa2bd --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBFactory.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.hugegraph.config.HugeConfig; +import org.rocksdb.AbstractEventListener; +import org.rocksdb.Cache; +import org.rocksdb.CompactionJobInfo; +import org.rocksdb.MemoryUsageType; +import org.rocksdb.MemoryUtil; +import org.rocksdb.RocksDB; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class RocksDBFactory { + + private static final List rocksdbChangedListeners = new ArrayList<>(); + private static RocksDBFactory dbFactory; + + static { + RocksDB.loadLibrary(); + } + + private final Map dbSessionMap = new ConcurrentHashMap<>(); + private final List destroyGraphDBs = new CopyOnWriteArrayList<>(); + private final ReentrantReadWriteLock operateLock; + ScheduledExecutorService scheduledExecutor; + private HugeConfig hugeConfig; + + private RocksDBFactory() { + this.operateLock = new ReentrantReadWriteLock(); + scheduledExecutor = Executors.newScheduledThreadPool(2); + scheduledExecutor.scheduleWithFixedDelay(() -> { + try { + Iterator itr = destroyGraphDBs.listIterator(); + while (itr.hasNext()) { + DBSessionWatcher watcher = itr.next(); + if (0 == watcher.dbSession.getRefCount()) { + try { + watcher.dbSession.shutdown(); + FileUtils.deleteDirectory(new File(watcher.dbSession.getDbPath())); + rocksdbChangedListeners.forEach(listener -> { + listener.onDBDeleted(watcher.dbSession.getGraphName(), + watcher.dbSession.getDbPath()); + }); + log.info("removed db {} and delete files", + watcher.dbSession.getDbPath()); + } catch (Exception e) { + log.error("DestroyGraphDB exception {}", e); + } + destroyGraphDBs.remove(watcher); + } else if (watcher.timestamp < (System.currentTimeMillis() - 1800 * 1000)) { + log.warn("DB {} has not been deleted refCount is {}, time is {} seconds", + watcher.dbSession.getDbPath(), + watcher.dbSession.getRefCount(), + (System.currentTimeMillis() - watcher.timestamp) / 1000); + } else { + // 超时强制删除 (30min) + watcher.dbSession.forceResetRefCount(); + } + } + + } catch (Exception e) { + log.error("RocksDBFactory scheduledExecutor exception {}", e); + } + }, 60, 60, TimeUnit.SECONDS); + } + + public static RocksDBFactory getInstance() { + if (dbFactory == null) { + synchronized (RocksDBFactory.class) { + if (dbFactory == null) { + dbFactory = new RocksDBFactory(); + } + } + } + return dbFactory; + } + + public int getSessionSize() { + return dbSessionMap.size(); + } + + public Set getGraphNames() { + return dbSessionMap.keySet(); + } + + public HugeConfig getHugeConfig() { + return this.hugeConfig; + } + + public void setHugeConfig(HugeConfig nodeConfig) { + this.hugeConfig = nodeConfig; + } + + public boolean findPathInRemovedList(String path) { + for (DBSessionWatcher pair : destroyGraphDBs) { + if (pair.dbSession.getDbPath().equals(path)) { + return true; + } + } + return false; + } + + public RocksDBSession queryGraphDB(String dbName) { + operateLock.readLock().lock(); + try { + RocksDBSession session = dbSessionMap.get(dbName); + if (session != null) { + return session.clone(); + } + } finally { + operateLock.readLock().unlock(); + } + return null; + } + + public RocksDBSession createGraphDB(String dbPath, String dbName) { + return createGraphDB(dbPath, dbName, 0); + } + + public RocksDBSession createGraphDB(String dbPath, String dbName, long version) { + operateLock.writeLock().lock(); + try { + RocksDBSession dbSession = dbSessionMap.get(dbName); + if (dbSession == null) { + log.info("create rocksdb for {}", dbName); + dbSession = new RocksDBSession(this.hugeConfig, dbPath, dbName, version); + dbSessionMap.put(dbName, dbSession); + } + return dbSession.clone(); + } finally { + operateLock.writeLock().unlock(); + } + } + + /** + * @param : + * @return long + * @description the size(KB) of the total rocksdb's data. + */ + public long getTotalSize() { + long kbSize = dbSessionMap.entrySet() + .stream() + .map(e -> e.getValue().getApproximateDataSize()) + .reduce(0L, Long::sum); + return kbSize; + } + + public Map getTotalKey() { + Map totalKeys = dbSessionMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey(), + e -> e.getValue() + .getEstimateNumKeys())); + return totalKeys; + } + + /** + * 释放rocksdb对象 + * + * @param dbName + * @return + */ + public boolean releaseGraphDB(String dbName) { + log.info("close {} 's rocksdb.", dbName); + operateLock.writeLock().lock(); + try { + RocksDBSession dbSession = dbSessionMap.get(dbName); + if (dbSession != null) { + dbSessionMap.remove(dbName); + rocksdbChangedListeners.forEach(listener -> { + listener.onDBSessionReleased(dbSession); + }); + dbSession.close(); + } + } finally { + operateLock.writeLock().unlock(); + } + + return false; + } + + /** + * 销毁图,并删除数据文件 + * + * @param dbName + */ + public void destroyGraphDB(String dbName) { + log.info("destroy {} 's rocksdb.", dbName); + RocksDBSession dbSession = dbSessionMap.get(dbName); + releaseGraphDB(dbName); + //增加删除标记 + if (dbSession != null) { + destroyGraphDBs.add(new DBSessionWatcher(dbSession)); + rocksdbChangedListeners.forEach(listener -> { + listener.onDBDeleteBegin(dbSession.getGraphName(), dbSession.getDbPath()); + }); + } + } + + public void releaseAllGraphDB() { + log.info("close all rocksdb."); + operateLock.writeLock().lock(); + try { + dbSessionMap.forEach((k, v) -> { + v.shutdown(); + }); + dbSessionMap.clear(); + } finally { + operateLock.writeLock().unlock(); + } + } + + public Map getApproximateMemoryUsageByType(List dbs, + List caches) { + if (dbs == null) { + dbs = new ArrayList<>(); + } else { + dbs = new ArrayList<>(dbs); + } + List sessions = new ArrayList<>(); + for (String dbName : getGraphNames()) { + RocksDBSession session = this.queryGraphDB(dbName); + if (session != null) { + dbs.add(session.getDB()); + sessions.add(session); + } + } + try { + HashSet allCaches = new HashSet<>(); + if (caches != null) { + allCaches.addAll(caches); + } + allCaches.add((Cache) hugeConfig.getProperty(RocksDBOptions.WRITE_CACHE)); + allCaches.add((Cache) hugeConfig.getProperty(RocksDBOptions.BLOCK_CACHE)); + return MemoryUtil.getApproximateMemoryUsageByType(dbs, allCaches); + } finally { + sessions.forEach(session -> { + session.close(); + }); + } + } + + public void addRocksdbChangedListener(RocksdbChangedListener listener) { + rocksdbChangedListeners.add(listener); + } + + public interface RocksdbChangedListener { + + default void onCompacted(String dbName) { + } + + default void onDBDeleteBegin(String dbName, String filePath) { + } + + default void onDBDeleted(String dbName, String filePath) { + } + + default void onDBSessionReleased(RocksDBSession dbSession) { + } + } + + class RocksdbEventListener extends AbstractEventListener { + + @Override + public void onCompactionCompleted(RocksDB db, CompactionJobInfo compactionJobInfo) { + super.onCompactionCompleted(db, compactionJobInfo); + rocksdbChangedListeners.forEach(listener -> { + listener.onCompacted(db.getName()); + }); + } + + @Override + public void onCompactionBegin(final RocksDB db, final CompactionJobInfo compactionJobInfo) { + log.info("RocksdbEventListener onCompactionBegin"); + } + } + + class DBSessionWatcher { + + public RocksDBSession dbSession; + public Long timestamp; + + public DBSessionWatcher(RocksDBSession dbSession) { + this.dbSession = dbSession; + timestamp = System.currentTimeMillis(); + } + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBOptions.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBOptions.java new file mode 100644 index 0000000000..f2fab66bc6 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBOptions.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import static org.apache.hugegraph.config.OptionChecker.allowValues; +import static org.apache.hugegraph.config.OptionChecker.disallowEmpty; +import static org.apache.hugegraph.config.OptionChecker.inValues; +import static org.apache.hugegraph.config.OptionChecker.rangeDouble; +import static org.apache.hugegraph.config.OptionChecker.rangeInt; + +import org.apache.hugegraph.config.ConfigConvOption; +import org.apache.hugegraph.config.ConfigListConvOption; +import org.apache.hugegraph.config.ConfigOption; +import org.apache.hugegraph.config.OptionHolder; +import org.apache.hugegraph.util.Bytes; +import org.rocksdb.CompactionStyle; +import org.rocksdb.CompressionType; +import org.rocksdb.InfoLogLevel; + +public class RocksDBOptions extends OptionHolder { + + public static final ConfigOption TOTAL_MEMORY_SIZE = + new ConfigOption<>( + "rocksdb.total_memory_size", + "Limit total memory of memtables for all dbs", + rangeInt(0L, Long.MAX_VALUE), + 48L * Bytes.GB + ); + public static final ConfigOption WRITE_BUFFER_RATIO = + new ConfigOption<>( + "rocksdb.write_buffer_ratio", + "write buffer ratio", + rangeDouble(0.0, 1.0), + 0.66 + ); + public static final ConfigOption WRITE_BUFFER_ALLOW_STALL = + new ConfigOption<>( + "rocksdb.write_buffer_allow_stall", + " if set true, it will enable stalling of writes when memory_usage() exceeds " + + "buffer_size." + + " It will wait for flush to complete and memory usage to drop down", + disallowEmpty(), + false + ); + + // public static final ConfigListOption DATA_DISKS = +// new ConfigListOption<>( +// "rocksdb.data_disks", +// false, +// "The optimized disks for storing data of RocksDB. " + +// "The format of each element: `STORE/TABLE: /path/disk`." + +// "Allowed keys are [g/vertex, g/edge_out, g/edge_in, " + +// "g/vertex_label_index, g/edge_label_index, " + +// "g/range_int_index, g/range_float_index, " + +// "g/range_long_index, g/range_double_index, " + +// "g/secondary_index, g/search_index, g/shard_index, " + +// "g/unique_index, g/olap]", +// null, +// String.class, +// ImmutableList.of() +// ); + public static final ConfigOption SST_PATH = + new ConfigOption<>( + "rocksdb.sst_path", + "The path for ingesting SST file into RocksDB.", + null, + "" + ); + + public static final ConfigOption LOG_LEVEL = + new ConfigOption<>( + "rocksdb.log_level", + "The info log level of RocksDB.", + allowValues(InfoLogLevel.DEBUG_LEVEL, + InfoLogLevel.INFO_LEVEL, + InfoLogLevel.WARN_LEVEL, + InfoLogLevel.ERROR_LEVEL, + InfoLogLevel.FATAL_LEVEL, + InfoLogLevel.HEADER_LEVEL), + InfoLogLevel.INFO_LEVEL + ); + + public static final ConfigOption NUM_LEVELS = + new ConfigOption<>( + "rocksdb.num_levels", + "Set the number of levels for this database.", + rangeInt(1, Integer.MAX_VALUE), + 7 + ); + public static final ConfigOption BLOCK_CACHE_CAPACITY = + new ConfigOption<>( + "rocksdb.block_cache_capacity", + "The amount of block cache in bytes that will be used by all RocksDBs", + rangeInt(0L, Long.MAX_VALUE), + 16L * Bytes.GB + ); + public static final ConfigOption SNAPSHOT_PATH = + new ConfigOption<>( + "rocksdb.snapshot_path", + "The path for storing snapshot of RocksDB.", + disallowEmpty(), + "rocksdb-snapshot" + ); + public static final ConfigConvOption COMPACTION_STYLE = + new ConfigConvOption<>( + "rocksdb.compaction_style", + "Set compaction style for RocksDB: LEVEL/UNIVERSAL/FIFO.", + allowValues("LEVEL", "UNIVERSAL", "FIFO"), + CompactionStyle::valueOf, + "LEVEL" + ); + public static final ConfigOption OPTIMIZE_MODE = + new ConfigOption<>( + "rocksdb.optimize_mode", + "Optimize for heavy workloads and big datasets.", + disallowEmpty(), + true + ); + public static final ConfigListConvOption LEVELS_COMPRESSIONS = + new ConfigListConvOption<>( + "rocksdb.compression_per_level", + "The compression algorithms for different levels of RocksDB, " + + "allowed values are none/snappy/z/bzip2/lz4/lz4hc/xpress/zstd.", + inValues("none", "snappy", "z", "bzip2", "lz4", "lz4hc", "xpress", "zstd"), + CompressionType::getCompressionType, + "none", "none", "snappy", "snappy", "snappy", "snappy", "snappy" + ); + public static final ConfigConvOption BOTTOMMOST_COMPRESSION = + new ConfigConvOption<>( + "rocksdb.bottommost_compression", + "The compression algorithm for the bottommost level of RocksDB, " + + "allowed values are none/snappy/z/bzip2/lz4/lz4hc/xpress/zstd.", + allowValues("none", "snappy", "z", "bzip2", "lz4", "lz4hc", "xpress", "zstd"), + CompressionType::getCompressionType, + "none" + ); + public static final ConfigConvOption COMPRESSION = + new ConfigConvOption<>( + "rocksdb.compression", + "The compression algorithm for compressing blocks of RocksDB, " + + "allowed values are none/snappy/z/bzip2/lz4/lz4hc/xpress/zstd.", + allowValues("none", "snappy", "z", "bzip2", "lz4", "lz4hc", "xpress", "zstd"), + CompressionType::getCompressionType, + "snappy" + ); + public static final ConfigOption MAX_BG_JOBS = + new ConfigOption<>( + "rocksdb.max_background_jobs", + "Maximum number of concurrent background jobs, including flushes and " + + "compactions.", + rangeInt(1, Integer.MAX_VALUE), + 8 + ); + public static final ConfigOption MAX_SUB_COMPACTIONS = + new ConfigOption<>( + "rocksdb.max_subcompactions", + "The value represents the maximum number of threads per compaction job.", + rangeInt(1, Integer.MAX_VALUE), + 4 + ); + public static final ConfigOption DELAYED_WRITE_RATE = + new ConfigOption<>( + "rocksdb.delayed_write_rate", + "The rate limit in bytes/s of user write requests " + + "when need to slow down if the compaction gets behind.", + rangeInt(1L, Long.MAX_VALUE), + 64L * Bytes.MB + ); + public static final ConfigOption MAX_OPEN_FILES = + new ConfigOption<>( + "rocksdb.max_open_files", + "The maximum number of open files that can be cached by RocksDB, " + + "-1 means no limit.", + rangeInt(-1, Integer.MAX_VALUE), + -1 + ); + public static final ConfigOption MAX_MANIFEST_FILE_SIZE = + new ConfigOption<>( + "rocksdb.max_manifest_file_size", + "The max size of manifest file in bytes.", + rangeInt(1L, Long.MAX_VALUE), + 100L * Bytes.MB + ); + public static final ConfigOption SKIP_STATS_UPDATE_ON_DB_OPEN = + new ConfigOption<>( + "rocksdb.skip_stats_update_on_db_open", + "Whether to skip statistics update when opening the database, " + + "setting this flag true allows us to not update statistics.", + disallowEmpty(), + false + ); + public static final ConfigOption MAX_FILE_OPENING_THREADS = + new ConfigOption<>( + "rocksdb.max_file_opening_threads", + "The max number of threads used to open files.", + rangeInt(1, Integer.MAX_VALUE), + 16 + ); + public static final ConfigOption MAX_TOTAL_WAL_SIZE = + new ConfigOption<>( + "rocksdb.max_total_wal_size", + "Total size of WAL files in bytes. Once WALs exceed this size, " + + "we will start forcing the flush of column families related, " + + "0 means no limit.", + rangeInt(0L, Long.MAX_VALUE), + 0L + ); + public static final ConfigOption DB_MEMTABLE_SIZE = + new ConfigOption<>( + "rocksdb.db_write_buffer_size", + "Total size of write buffers in bytes across all column families, " + + "0 means no limit.", + rangeInt(0L, Long.MAX_VALUE), + 0L + ); + public static final ConfigOption DELETE_OBSOLETE_FILE_PERIOD = + new ConfigOption<>( + "rocksdb.delete_obsolete_files_period", + "The periodicity in seconds when obsolete files get deleted, " + + "0 means always do full purge.", + rangeInt(0L, Long.MAX_VALUE), + 6L * 60 * 60 + ); + public static final ConfigOption MEMTABLE_SIZE = + new ConfigOption<>( + "rocksdb.write_buffer_size", + "Amount of data in bytes to build up in memory.", + rangeInt(Bytes.MB, Long.MAX_VALUE), + 32L * Bytes.MB + ); + public static final ConfigOption MAX_MEMTABLES = + new ConfigOption<>( + "rocksdb.max_write_buffer_number", + "The maximum number of write buffers that are built up in memory.", + rangeInt(1, Integer.MAX_VALUE), + 32 + ); + public static final ConfigOption MIN_MEMTABLES_TO_MERGE = + new ConfigOption<>( + "rocksdb.min_write_buffer_number_to_merge", + "The minimum number of write buffers that will be merged together.", + rangeInt(1, Integer.MAX_VALUE), + 16 + ); + public static final ConfigOption MAX_MEMTABLES_TO_MAINTAIN = + new ConfigOption<>( + "rocksdb.max_write_buffer_number_to_maintain", + "The total maximum number of write buffers to maintain in memory.", + rangeInt(0, Integer.MAX_VALUE), + 0 + ); + public static final ConfigOption DYNAMIC_LEVEL_BYTES = + new ConfigOption<>( + "rocksdb.level_compaction_dynamic_level_bytes", + "Whether to enable level_compaction_dynamic_level_bytes, " + + "if it's enabled we give max_bytes_for_level_multiplier a " + + "priority against max_bytes_for_level_base, the bytes of " + + "base level is dynamic for a more predictable LSM tree, " + + "it is useful to limit worse case space amplification. " + + "Turning this feature on/off for an existing DB can cause " + + "unexpected LSM tree structure so it's not recommended.", + disallowEmpty(), + false + ); + public static final ConfigOption MAX_LEVEL1_BYTES = + new ConfigOption<>( + "rocksdb.max_bytes_for_level_base", + "The upper-bound of the total size of level-1 files in bytes.", + rangeInt(Bytes.MB, Long.MAX_VALUE), + 10L * Bytes.GB + ); + public static final ConfigOption MAX_LEVEL_BYTES_MULTIPLIER = + new ConfigOption<>( + "rocksdb.max_bytes_for_level_multiplier", + "The ratio between the total size of level (L+1) files and " + + "the total size of level L files for all L.", + rangeDouble(1.0, Double.MAX_VALUE), + 10.0 + ); + public static final ConfigOption TARGET_FILE_SIZE_BASE = + new ConfigOption<>( + "rocksdb.target_file_size_base", + "The target file size for compaction in bytes.", + rangeInt(Bytes.MB, Long.MAX_VALUE), + 256L * Bytes.MB + ); + public static final ConfigOption TARGET_FILE_SIZE_MULTIPLIER = + new ConfigOption<>( + "rocksdb.target_file_size_multiplier", + "The size ratio between a level L file and a level (L+1) file.", + rangeInt(1, Integer.MAX_VALUE), + 2 + ); + public static final ConfigOption LEVEL0_COMPACTION_TRIGGER = + new ConfigOption<>( + "rocksdb.level0_file_num_compaction_trigger", + "Number of files to trigger level-0 compaction.", + rangeInt(0, Integer.MAX_VALUE), + 10 + ); + public static final ConfigOption LEVEL0_SLOWDOWN_WRITES_TRIGGER = + new ConfigOption<>( + "rocksdb.level0_slowdown_writes_trigger", + "Soft limit on number of level-0 files for slowing down writes.", + rangeInt(-1, Integer.MAX_VALUE), + 256 + ); + public static final ConfigOption LEVEL0_STOP_WRITES_TRIGGER = + new ConfigOption<>( + "rocksdb.level0_stop_writes_trigger", + "Hard limit on number of level-0 files for stopping writes.", + rangeInt(-1, Integer.MAX_VALUE), + 1024 + ); + public static final ConfigOption SOFT_PENDING_COMPACTION_LIMIT = + new ConfigOption<>( + "rocksdb.soft_pending_compaction_bytes_limit", + "The soft limit to impose on pending compaction in bytes.", + rangeInt(Bytes.GB, Long.MAX_VALUE), + 1024L * Bytes.GB + ); + public static final ConfigOption HARD_PENDING_COMPACTION_LIMIT = + new ConfigOption<>( + "rocksdb.hard_pending_compaction_bytes_limit", + "The hard limit to impose on pending compaction in bytes.", + rangeInt(Bytes.GB, Long.MAX_VALUE), + 2048L * Bytes.GB + ); + public static final ConfigOption ALLOW_MMAP_WRITES = + new ConfigOption<>( + "rocksdb.allow_mmap_writes", + "Allow the OS to mmap file for writing.", + disallowEmpty(), + false + ); + public static final ConfigOption ALLOW_MMAP_READS = + new ConfigOption<>( + "rocksdb.allow_mmap_reads", + "Allow the OS to mmap file for reading sst tables.", + disallowEmpty(), + false + ); + public static final ConfigOption USE_DIRECT_READS = + new ConfigOption<>( + "rocksdb.use_direct_reads", + "Enable the OS to use direct I/O for reading sst tables.", + disallowEmpty(), + false + ); + public static final ConfigOption USE_DIRECT_READS_WRITES_FC = + new ConfigOption<>( + "rocksdb.use_direct_io_for_flush_and_compaction", + "Enable the OS to use direct read/writes in flush and compaction.", + disallowEmpty(), + false + ); + public static final ConfigOption PIN_L0_FILTER_AND_INDEX_IN_CACHE = + new ConfigOption<>( + "rocksdb.pin_l0_filter_and_index_blocks_in_cache", + "Indicating if we'd put index/filter blocks to the block cache.", + disallowEmpty(), + true + ); + public static final ConfigOption PUT_FILTER_AND_INDEX_IN_CACHE = + new ConfigOption<>( + "rocksdb.cache_index_and_filter_blocks", + "Indicating if we'd put index/filter blocks to the block cache.", + disallowEmpty(), + true + ); + public static final ConfigOption BLOOM_FILTER_BITS_PER_KEY = + new ConfigOption<>( + "rocksdb.bloom_filter_bits_per_key", + "The bits per key in bloom filter, a good value is 10, " + + "which yields a filter with ~ 1% false positive rate, " + + "-1 means no bloom filter.", + rangeInt(-1, Integer.MAX_VALUE), + -1 + ); + public static final ConfigOption BLOOM_FILTER_MODE = + new ConfigOption<>( + "rocksdb.bloom_filter_block_based_mode", + "Use block based filter rather than full filter.", + disallowEmpty(), + false + ); + public static final ConfigOption BLOOM_FILTER_WHOLE_KEY = + new ConfigOption<>( + "rocksdb.bloom_filter_whole_key_filtering", + "True if place whole keys in the bloom filter, " + + "else place the prefix of keys.", + disallowEmpty(), + true + ); + public static final ConfigOption BLOOM_FILTERS_SKIP_LAST_LEVEL = + new ConfigOption<>( + "rocksdb.optimize_filters_for_hits", + "This flag allows us to not store filters for the last level.", + disallowEmpty(), + false + ); + public static final String BLOCK_TABLE_CONFIG = "rocksdb.block_table_config"; + public static final String WRITE_BUFFER_MANAGER = "rocksdb.write_buffer_manager"; + public static final String BLOCK_CACHE = "rocksdb.block_cache"; + public static final String WRITE_CACHE = "rocksdb.write_cache"; + public static final String ENV = "rocksdb.env"; + private static volatile RocksDBOptions instance; + + private RocksDBOptions() { + super(); + } + + public static synchronized RocksDBOptions instance() { + if (instance == null) { + instance = new RocksDBOptions(); + instance.registerOptions(); + } + return instance; + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBScanIterator.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBScanIterator.java new file mode 100644 index 0000000000..ff255d9ea9 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBScanIterator.java @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.util.Arrays; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hugegraph.rocksdb.access.RocksDBSession.BackendColumn; +import org.apache.hugegraph.util.Bytes; +import org.apache.hugegraph.util.E; +import org.rocksdb.RocksIterator; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RocksDBScanIterator implements ScanIterator { + + private static final byte[] EMPTY_VALUE = new byte[0]; + private final RocksIterator rawIt; + private final byte[] keyBegin; + private final byte[] keyEnd; + private final int scanType; + + private final AtomicBoolean closed = new AtomicBoolean(false); + private final RocksDBSession.RefCounter iterReference; + private byte[] key; + private boolean matched; + + public RocksDBScanIterator(RocksIterator rawIt, byte[] keyBegin, byte[] keyEnd, + int scanType, RocksDBSession.RefCounter iterReference) { + this.rawIt = rawIt; + this.keyBegin = keyBegin; + this.keyEnd = keyEnd; + this.scanType = scanType; + + this.key = keyBegin; + this.matched = false; + this.iterReference = iterReference; + this.seek(); + } + + private void checkArguments() { + E.checkArgument(!(this.match(ScanIterator.Trait.SCAN_PREFIX_BEGIN) && + this.match(ScanIterator.Trait.SCAN_PREFIX_END)), + "Can't set SCAN_PREFIX_WITH_BEGIN and " + + "SCAN_PREFIX_WITH_END at the same time"); + + E.checkArgument(!(this.match(ScanIterator.Trait.SCAN_PREFIX_BEGIN) && + this.match(ScanIterator.Trait.SCAN_GT_BEGIN)), + "Can't set SCAN_PREFIX_WITH_BEGIN and " + + "SCAN_GT_BEGIN/SCAN_GTE_BEGIN at the same time"); + + E.checkArgument(!(this.match(ScanIterator.Trait.SCAN_PREFIX_END) && + this.match(ScanIterator.Trait.SCAN_LT_END)), + "Can't set SCAN_PREFIX_WITH_END and " + + "SCAN_LT_END/SCAN_LTE_END at the same time"); + + if (this.match(ScanIterator.Trait.SCAN_PREFIX_BEGIN)) { + E.checkArgument(this.keyBegin != null, + "Parameter `keyBegin` can't be null " + + "if set SCAN_PREFIX_WITH_BEGIN"); + E.checkArgument(this.keyEnd == null, + "Parameter `keyEnd` must be null " + + "if set SCAN_PREFIX_WITH_BEGIN"); + } + + if (this.match(ScanIterator.Trait.SCAN_PREFIX_END)) { + E.checkArgument(this.keyEnd != null, + "Parameter `keyEnd` can't be null " + + "if set SCAN_PREFIX_WITH_END"); + } + + if (this.match(ScanIterator.Trait.SCAN_GT_BEGIN)) { + E.checkArgument(this.keyBegin != null, + "Parameter `keyBegin` can't be null " + + "if set SCAN_GT_BEGIN or SCAN_GTE_BEGIN"); + } + + if (this.match(ScanIterator.Trait.SCAN_LT_END)) { + E.checkArgument(this.keyEnd != null, + "Parameter `keyEnd` can't be null " + + "if set SCAN_LT_END or SCAN_LTE_END"); + } + } + + @Override + public boolean hasNext() { + + if (this.closed.get()) { + //log.warn("Iterator has been closed"); + return false; + } + this.matched = this.rawIt.isOwningHandle(); + if (!this.matched) { + // Maybe closed + return this.matched; + } + + this.matched = this.rawIt.isValid(); + if (this.matched) { + // Update position for paging + this.key = this.rawIt.key(); + this.matched = this.filter(this.key); + } + if (!this.matched) { + // The end + this.key = null; + // Free the iterator if finished + this.close(); + } + return this.matched; + } + + private void seek() { + if (this.closed.get()) { + log.warn("Iterator has been closed"); + return; + } + if (this.keyBegin == null) { + // Seek to the first if no `keyBegin` + this.rawIt.seekToFirst(); + } else { + /* + * Seek to `keyBegin`: + * if set SCAN_GT_BEGIN/SCAN_GTE_BEGIN (key > / >= 'xx') + * or if set SCAN_PREFIX_WITH_BEGIN (key prefix with 'xx') + */ + this.rawIt.seek(this.keyBegin); + + // Skip `keyBegin` if set SCAN_GT_BEGIN (key > 'xx') + if (this.match(ScanIterator.Trait.SCAN_GT_BEGIN) && + !this.match(ScanIterator.Trait.SCAN_GTE_BEGIN)) { + while (this.rawIt.isValid() && + Bytes.equals(this.rawIt.key(), this.keyBegin)) { + this.rawIt.next(); + } + } + } + } + + @Override + public boolean isValid() { + return this.rawIt.isValid(); + } + + @Override + public BackendColumn next() { + if (this.closed.get()) { + log.warn("Iterator has been closed"); + throw new NoSuchElementException(); + } + if (!this.matched) { + if (!this.hasNext()) { + throw new NoSuchElementException(); + } + } + + BackendColumn col = BackendColumn.of(this.key, + this.match(Trait.SCAN_KEYONLY) ? EMPTY_VALUE : + this.rawIt.value()); + + this.rawIt.next(); + this.matched = false; + + return col; + } + + @Override + public long count() { + long count = 0L; + while (this.hasNext()) { + this.rawIt.next(); + count++; + } + return count; + } + + @Override + public byte[] position() { + return this.key; + } + + private boolean filter(byte[] v) { + + if (this.match(ScanIterator.Trait.SCAN_PREFIX_BEGIN)) { + return Bytes.prefixWith(v, this.keyBegin); + + } else if (this.match(ScanIterator.Trait.SCAN_PREFIX_END)) { + assert this.keyEnd != null; + return Bytes.prefixWith(v, this.keyEnd); + + } else if (this.match(ScanIterator.Trait.SCAN_LT_END)) { + assert this.keyEnd != null; + + if (this.match(ScanIterator.Trait.SCAN_LTE_END)) { + v = Arrays.copyOfRange(v, 0, this.keyEnd.length); + return Bytes.compare(v, this.keyEnd) <= 0; + } else { + return Bytes.compare(v, this.keyEnd) < 0; + } + } else { + + return true; + } + } + + @Override + public void close() { + if (!this.closed.getAndSet(true)) { + if (this.rawIt.isOwningHandle()) { + this.rawIt.close(); + } + this.iterReference.release(); + } + } + + private boolean match(int expected) { + return (expected & this.scanType) == expected; + } + +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBSession.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBSession.java new file mode 100644 index 0000000000..fe27183f18 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/RocksDBSession.java @@ -0,0 +1,1054 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; + +import org.apache.commons.io.FileUtils; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.rocksdb.access.util.Asserts; +import org.apache.hugegraph.store.term.HgPair; +import org.apache.hugegraph.util.Bytes; +import org.apache.hugegraph.util.E; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.Checkpoint; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.ColumnFamilyOptionsInterface; +import org.rocksdb.CompressionType; +import org.rocksdb.DBOptions; +import org.rocksdb.DBOptionsInterface; +import org.rocksdb.Env; +import org.rocksdb.FlushOptions; +import org.rocksdb.IngestExternalFileOptions; +import org.rocksdb.MutableColumnFamilyOptionsInterface; +import org.rocksdb.MutableDBOptionsInterface; +import org.rocksdb.Options; +import org.rocksdb.Range; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.SizeApproximationFlag; +import org.rocksdb.Slice; +import org.rocksdb.Statistics; +import org.rocksdb.WriteBufferManager; +import org.rocksdb.WriteOptions; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class RocksDBSession implements AutoCloseable, Cloneable { + + private static final int CPUS = Runtime.getRuntime().availableProcessors(); + final Statistics rocksDbStats; + final WriteOptions writeOptions; + final AtomicInteger refCount; + final AtomicBoolean shutdown; + final String tempSuffix = "_temp_"; + private final transient String graphName; + private final HugeConfig hugeConfig; + private final ReentrantReadWriteLock cfHandleLock; + private final Map tables; + private transient String dbPath; + private RocksDB rocksDB; + private DBOptions dbOptions; + private volatile boolean closed = false; + + public RocksDBSession(HugeConfig hugeConfig, String dbDataPath, String graphName, + long version) { + this.hugeConfig = hugeConfig; + this.graphName = graphName; + this.cfHandleLock = new ReentrantReadWriteLock(); + this.tables = new ConcurrentHashMap<>(); + this.refCount = new AtomicInteger(1); + this.shutdown = new AtomicBoolean(false); + this.writeOptions = new WriteOptions(); + this.rocksDbStats = new Statistics(); + openRocksDB(dbDataPath, version); + } + + private RocksDBSession(RocksDBSession origin) { + this.hugeConfig = origin.hugeConfig; + this.graphName = origin.graphName; + this.cfHandleLock = origin.cfHandleLock; + this.tables = origin.tables; + this.dbPath = origin.dbPath; + this.rocksDB = origin.rocksDB; + this.dbOptions = origin.dbOptions; + this.writeOptions = origin.writeOptions; + this.rocksDbStats = origin.rocksDbStats; + this.shutdown = origin.shutdown; + this.refCount = origin.refCount; + this.refCount.incrementAndGet(); + } + + /** + * create directory + */ + public static boolean createDirectory(String dirPath) { + Asserts.isTrue(!dirPath.isEmpty(), "dir path is empty"); + + File dirObject = new File(dirPath); + if (!dirObject.exists() || !dirObject.isDirectory()) { + dirObject.mkdirs(); + } + + return true; + } + + public static void initOptions(HugeConfig conf, + DBOptionsInterface db, + MutableDBOptionsInterface mdb, + ColumnFamilyOptionsInterface cf, + MutableColumnFamilyOptionsInterface mcf) { + final boolean optimize = conf.get(RocksDBOptions.OPTIMIZE_MODE); + if (db != null) { + /* + * Set true then the database will be created if it is missing. + * should we use options.setCreateMissingColumnFamilies()? + */ + db.setCreateIfMissing(true); + if (optimize) { + int processors = RocksDBSession.CPUS; + db.setIncreaseParallelism(Math.max(processors / 2, 1)); + db.setAllowConcurrentMemtableWrite(true); + db.setEnableWriteThreadAdaptiveYield(true); + } + db.setInfoLogLevel(conf.get(RocksDBOptions.LOG_LEVEL)); + db.setMaxSubcompactions(conf.get(RocksDBOptions.MAX_SUB_COMPACTIONS)); + db.setAllowMmapWrites(conf.get(RocksDBOptions.ALLOW_MMAP_WRITES)); + db.setAllowMmapReads(conf.get(RocksDBOptions.ALLOW_MMAP_READS)); + db.setUseDirectReads(conf.get(RocksDBOptions.USE_DIRECT_READS)); + db.setUseDirectIoForFlushAndCompaction( + conf.get(RocksDBOptions.USE_DIRECT_READS_WRITES_FC)); + db.setMaxManifestFileSize(conf.get(RocksDBOptions.MAX_MANIFEST_FILE_SIZE)); + db.setSkipStatsUpdateOnDbOpen(conf.get(RocksDBOptions.SKIP_STATS_UPDATE_ON_DB_OPEN)); + db.setMaxFileOpeningThreads(conf.get(RocksDBOptions.MAX_FILE_OPENING_THREADS)); + db.setDbWriteBufferSize(conf.get(RocksDBOptions.DB_MEMTABLE_SIZE)); + WriteBufferManager bufferManager = + (WriteBufferManager) conf.getProperty(RocksDBOptions.WRITE_BUFFER_MANAGER); + if (bufferManager != null) { + db.setWriteBufferManager(bufferManager); + log.info("rocksdb use global WriteBufferManager"); + } + Env env = (Env) conf.getProperty(RocksDBOptions.ENV); + if (env != null) { + db.setEnv(env); + } + } + + if (mdb != null) { + /* + * Migrate to max_background_jobs option + * https://github.com/facebook/rocksdb/wiki/Thread-Pool + * https://github.com/facebook/rocksdb/pull/2205/files + */ + mdb.setMaxBackgroundJobs(conf.get(RocksDBOptions.MAX_BG_JOBS)); + mdb.setDelayedWriteRate(conf.get(RocksDBOptions.DELAYED_WRITE_RATE)); + mdb.setMaxOpenFiles(conf.get(RocksDBOptions.MAX_OPEN_FILES)); + mdb.setMaxTotalWalSize(conf.get(RocksDBOptions.MAX_TOTAL_WAL_SIZE)); + mdb.setDeleteObsoleteFilesPeriodMicros( + 1000000 * conf.get(RocksDBOptions.DELETE_OBSOLETE_FILE_PERIOD)); + } + + if (cf != null) { + if (optimize) { + // Optimize RocksDB + cf.optimizeLevelStyleCompaction(); + cf.optimizeUniversalStyleCompaction(); + } + + int numLevels = conf.get(RocksDBOptions.NUM_LEVELS); + List compressions = conf.get( + RocksDBOptions.LEVELS_COMPRESSIONS); + E.checkArgument(compressions.isEmpty() || + compressions.size() == numLevels, + "Elements number of '%s' must be 0 or " + + "be the same as '%s', but got %s != %s", + RocksDBOptions.LEVELS_COMPRESSIONS.name(), + RocksDBOptions.NUM_LEVELS.name(), + compressions.size(), numLevels); + + cf.setNumLevels(numLevels); + cf.setCompactionStyle(conf.get(RocksDBOptions.COMPACTION_STYLE)); + + cf.setBottommostCompressionType( + conf.get(RocksDBOptions.BOTTOMMOST_COMPRESSION)); + if (!compressions.isEmpty()) { + cf.setCompressionPerLevel(compressions); + } + + cf.setMinWriteBufferNumberToMerge( + conf.get(RocksDBOptions.MIN_MEMTABLES_TO_MERGE)); + cf.setMaxWriteBufferNumberToMaintain( + conf.get(RocksDBOptions.MAX_MEMTABLES_TO_MAINTAIN)); + + cf.setLevelCompactionDynamicLevelBytes( + conf.get(RocksDBOptions.DYNAMIC_LEVEL_BYTES)); + + // https://github.com/facebook/rocksdb/wiki/Block-Cache + BlockBasedTableConfig tableConfig = + (BlockBasedTableConfig) conf.getProperty(RocksDBOptions.BLOCK_TABLE_CONFIG); + + if (tableConfig != null) { + cf.setTableFormatConfig(tableConfig); + } + + cf.setOptimizeFiltersForHits( + conf.get(RocksDBOptions.BLOOM_FILTERS_SKIP_LAST_LEVEL)); + + // https://github.com/facebook/rocksdb/tree/master/utilities/merge_operators + cf.setMergeOperatorName("uint64add"); // uint64add/stringappend + } + + if (mcf != null) { + mcf.setCompressionType(conf.get(RocksDBOptions.COMPRESSION)); + + mcf.setWriteBufferSize(conf.get(RocksDBOptions.MEMTABLE_SIZE)); + mcf.setMaxWriteBufferNumber(conf.get(RocksDBOptions.MAX_MEMTABLES)); + + mcf.setMaxBytesForLevelBase( + conf.get(RocksDBOptions.MAX_LEVEL1_BYTES)); + mcf.setMaxBytesForLevelMultiplier( + conf.get(RocksDBOptions.MAX_LEVEL_BYTES_MULTIPLIER)); + + mcf.setTargetFileSizeBase( + conf.get(RocksDBOptions.TARGET_FILE_SIZE_BASE)); + mcf.setTargetFileSizeMultiplier( + conf.get(RocksDBOptions.TARGET_FILE_SIZE_MULTIPLIER)); + + mcf.setLevel0FileNumCompactionTrigger( + conf.get(RocksDBOptions.LEVEL0_COMPACTION_TRIGGER)); + mcf.setLevel0SlowdownWritesTrigger( + conf.get(RocksDBOptions.LEVEL0_SLOWDOWN_WRITES_TRIGGER)); + mcf.setLevel0StopWritesTrigger( + conf.get(RocksDBOptions.LEVEL0_STOP_WRITES_TRIGGER)); + + mcf.setSoftPendingCompactionBytesLimit( + conf.get(RocksDBOptions.SOFT_PENDING_COMPACTION_LIMIT)); + mcf.setHardPendingCompactionBytesLimit( + conf.get(RocksDBOptions.HARD_PENDING_COMPACTION_LIMIT)); + + // conf.get(RocksDBOptions.BULKLOAD_MODE); + } + } + + @Override + public RocksDBSession clone() { + return new RocksDBSession(this); + } + + public RocksDB getDB() { + return this.rocksDB; + } + + public HugeConfig getHugeConfig() { + return this.hugeConfig; + } + + public String getGraphName() { + return this.graphName; + } + + public WriteOptions getWriteOptions() { + return this.writeOptions; + } + + public Map getTables() { + return tables; + } + + public ReentrantReadWriteLock.ReadLock getCfHandleReadLock() { + return this.cfHandleLock.readLock(); + } + + private String findLatestDBPath(String path, long version) { + File file = new File(path); + int strIndex = file.getName().indexOf("_"); + + String defaultName; + if (strIndex < 0) { + defaultName = file.getName(); + } else { + defaultName = file.getName().substring(0, strIndex); + } + String prefix = defaultName + "_"; + File parentFile = new File(file.getParent()); + ArrayList> dbs = new ArrayList<>(); + File[] files = parentFile.listFiles(); + if (files != null) { + dbs.ensureCapacity(files.length); + // search all db path + for (final File sFile : files) { + final String name = sFile.getName(); + if (!name.startsWith(prefix) && !name.equals(defaultName)) { + continue; + } + if (name.endsWith(tempSuffix)) { + continue; + } + long v1 = -1L; + long v2 = -1L; + if (name.length() > defaultName.length()) { + String[] versions = name.substring(prefix.length()).split("_"); + if (versions.length == 1) { + v1 = Long.parseLong(versions[0]); + } else if (versions.length == 2) { + v1 = Long.parseLong(versions[0]); + v2 = Long.parseLong(versions[1]); + } else { + continue; + } + } + dbs.add(new HgPair<>(v1, v2)); + } + } + + RocksDBFactory factory = RocksDBFactory.getInstance(); + // get last index db path + String latestDBPath = ""; + if (!dbs.isEmpty()) { + + dbs.sort((o1, o2) -> o1.getKey().equals(o2.getKey()) ? + o1.getValue().compareTo(o2.getValue()) : + o1.getKey().compareTo(o2.getKey())); + final int dbCount = dbs.size(); + for (int i = 0; i < dbCount; i++) { + final HgPair pair = dbs.get(i); + String curDBName; + if (pair.getKey() == -1L) { + curDBName = defaultName; + } else if (pair.getValue() == -1L) { + curDBName = String.format("%s_%d", defaultName, pair.getKey()); + } else { + curDBName = + String.format("%s_%d_%d", defaultName, pair.getKey(), pair.getValue()); + } + String curDBPath = Paths.get(parentFile.getPath(), curDBName).toString(); + if (i == dbCount - 1) { + latestDBPath = curDBPath; + } else { + // delete old db,在删除队列的文件不要删除 + if (!factory.findPathInRemovedList(curDBPath)) { + try { + FileUtils.deleteDirectory(new File(curDBPath)); + log.info("delete old dbpath {}", curDBPath); + } catch (IOException e) { + log.error("fail to delete old dbpath {}", curDBPath, e); + } + } + } + } + } else { + latestDBPath = Paths.get(parentFile.getPath(), defaultName).toString(); + } + if (factory.findPathInRemovedList(latestDBPath)) { + // 已经被删除,创建新的目录 + latestDBPath = + Paths.get(parentFile.getPath(), String.format("%s_%d", defaultName, version)) + .toString(); + } + // + log.info("{} latest db path {}", this.graphName, latestDBPath); + return latestDBPath; + } + + public boolean setDisableWAL(final boolean flag) { + this.writeOptions.setSync(!flag); + return this.writeOptions.setDisableWAL(flag).disableWAL(); + } + + public void checkTable(String table) { + try { + ColumnFamilyHandle handle = tables.get(table); + if (handle == null) { + createTable(table); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + public boolean tableIsExist(String table) { + return this.tables.containsKey(table); + } + + private void openRocksDB(String dbDataPath, long version) { + + if (dbDataPath.endsWith(File.separator)) { + this.dbPath = dbDataPath + this.graphName; + } else { + this.dbPath = dbDataPath + File.separator + this.graphName; + } + + this.dbPath = findLatestDBPath(dbPath, version); + + Asserts.isTrue((dbPath != null), + () -> new DBStoreException("the data-path of RocksDB is null")); + + //makedir for rocksdb + createDirectory(dbPath); + + Options opts = new Options(); + RocksDBSession.initOptions(hugeConfig, opts, opts, opts, opts); + dbOptions = new DBOptions(opts); + dbOptions.setStatistics(rocksDbStats); + + try { + List columnFamilyDescriptorList = + new ArrayList<>(); + List columnFamilyBytes = RocksDB.listColumnFamilies(new Options(), dbPath); + + ColumnFamilyOptions cfOptions = new ColumnFamilyOptions(); + RocksDBSession.initOptions(this.hugeConfig, null, null, cfOptions, cfOptions); + + if (columnFamilyBytes.size() > 0) { + for (byte[] columnFamilyByte : columnFamilyBytes) { + columnFamilyDescriptorList.add( + new ColumnFamilyDescriptor(columnFamilyByte, cfOptions)); + } + } else { + columnFamilyDescriptorList.add( + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions)); + } + List columnFamilyHandleList = new ArrayList<>(); + this.rocksDB = RocksDB.open(dbOptions, dbPath, columnFamilyDescriptorList, + columnFamilyHandleList); + Asserts.isTrue(columnFamilyHandleList.size() > 0, "must have column family"); + + for (ColumnFamilyHandle handle : columnFamilyHandleList) { + this.tables.put(new String(handle.getDescriptor().getName()), handle); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + private ColumnFamilyHandle createTable(String table) throws RocksDBException { + cfHandleLock.writeLock().lock(); + try { + ColumnFamilyHandle handle = tables.get(table); + if (handle == null) { + ColumnFamilyOptions cfOptions = new ColumnFamilyOptions(); + RocksDBSession.initOptions(this.hugeConfig, null, null, cfOptions, cfOptions); + ColumnFamilyDescriptor cfDescriptor = + new ColumnFamilyDescriptor(table.getBytes(), cfOptions); + handle = this.rocksDB.createColumnFamily(cfDescriptor); + tables.put(table, handle); + } + return handle; + } finally { + cfHandleLock.writeLock().unlock(); + } + } + + public ColumnFamilyHandle getCF(String table) { + ColumnFamilyHandle handle = this.tables.get(table); + try { + if (handle == null) { + getCfHandleReadLock().unlock(); + handle = createTable(table); + getCfHandleReadLock().lock(); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + return handle; + } + + public CFHandleLock getCFHandleLock(String table) { + this.cfHandleLock.readLock().lock(); + ColumnFamilyHandle handle = this.tables.get(table); + try { + if (handle == null) { + this.cfHandleLock.readLock().unlock(); + handle = createTable(table); + this.cfHandleLock.readLock().lock(); + handle = this.tables.get(table); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + return new CFHandleLock(handle, this.cfHandleLock); + } + + @Override + public String toString() { + return "RocksDBSession"; + } + + /** + * Delete all data of table + * + * @param tables + * @throws DBStoreException + */ + public void deleteTables(String... tables) throws DBStoreException { + dropTables(tables); + createTables(tables); + } + + public void createTables(String... tables) throws DBStoreException { + if (!this.rocksDB.isOwningHandle()) { + return; + } + cfHandleLock.writeLock().lock(); + try { + List cfList = new ArrayList<>(); + for (String table : tables) { + if (this.tables.get(table) != null) { + continue; + } + + ColumnFamilyDescriptor cfDescriptor = new ColumnFamilyDescriptor(table.getBytes()); + cfList.add(cfDescriptor); + } + + if (cfList.size() > 0) { + List cfHandles = this.rocksDB.createColumnFamilies(cfList); + for (ColumnFamilyHandle handle : cfHandles) { + this.tables.put(new String(handle.getDescriptor().getName()), handle); + } + } + + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.writeLock().unlock(); + } + + } + + public void dropTables(String... tables) throws DBStoreException { + if (!this.rocksDB.isOwningHandle()) { + return; + } + + cfHandleLock.writeLock().lock(); + try { + List cfHandles = new ArrayList<>(); + for (String table : tables) { + ColumnFamilyHandle handle = this.tables.get(table); + if (handle != null) { + cfHandles.add(handle); + } else { + this.tables.remove(table); + } + } + + if (cfHandles.size() > 0) { + this.rocksDB.dropColumnFamilies(cfHandles); + } + + for (ColumnFamilyHandle h : cfHandles) { + String tName = new String(h.getDescriptor().getName()); + h.close(); + log.info("drop table: {}", tName); + this.tables.remove(tName); + } + + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.writeLock().unlock(); + } + } + + public synchronized void truncate() { + Set tableNames = this.tables.keySet(); + String defaultCF = new String(RocksDB.DEFAULT_COLUMN_FAMILY); + tableNames.remove(defaultCF); + + log.info("truncate table: {}", String.join(",", tableNames)); + this.dropTables(tableNames.toArray(new String[0])); + this.createTables(tableNames.toArray(new String[0])); + } + + public void flush(boolean wait) { + cfHandleLock.readLock().lock(); + try { + rocksDB.flush(new FlushOptions().setWaitForFlush(wait), + tables.entrySet() + .stream().map(e -> e.getValue()) + .collect(Collectors.toList())); + + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.readLock().unlock(); + } + } + + void shutdown() { + if (!shutdown.compareAndSet(false, true)) { + return; + } + log.info("shutdown db {}, path is {} ", getGraphName(), getDbPath()); + + cfHandleLock.writeLock().lock(); + try { + this.tables.forEach((k, v) -> { + v.close(); + }); + this.tables.clear(); + + if (rocksDB != null) { + try { + this.rocksDB.syncWal(); + } catch (RocksDBException e) { + log.warn("exception ", e); + } + this.rocksDB.close(); + } + rocksDB = null; + if (dbOptions != null) { + this.dbOptions.close(); + this.writeOptions.close(); + this.rocksDbStats.close(); + dbOptions = null; + } + } finally { + cfHandleLock.writeLock().unlock(); + } + } + + public SessionOperator sessionOp() { + return new SessionOperatorImpl(this); + } + + public long getLatestSequenceNumber() { + return rocksDB.getLatestSequenceNumber(); + } + + public Statistics getRocksDbStats() { + return rocksDbStats; + } + + public void saveSnapshot(String snapshotPath) throws DBStoreException { + long startTime = System.currentTimeMillis(); + log.info("begin save snapshot at {}", snapshotPath); + cfHandleLock.readLock().lock(); + try (final Checkpoint checkpoint = Checkpoint.create(this.rocksDB)) { + final String tempPath = Paths.get(snapshotPath) + "_temp"; + final File tempFile = new File(tempPath); + FileUtils.deleteDirectory(tempFile); + checkpoint.createCheckpoint(tempPath); + final File snapshotFile = new File(snapshotPath); + try { + FileUtils.deleteDirectory(snapshotFile); + FileUtils.moveDirectory(tempFile, snapshotFile); + } catch (IOException e) { + log.error("Fail to rename {} to {}", tempPath, snapshotPath, e); + throw new DBStoreException( + String.format("Fail to rename %s to %s", tempPath, snapshotPath)); + } + } catch (final DBStoreException e) { + throw e; + } catch (final Exception e) { + log.error("Fail to write snapshot at path: {}", snapshotPath, e); + throw new DBStoreException( + String.format("Fail to write snapshot at path %s", snapshotPath)); + } finally { + cfHandleLock.readLock().unlock(); + } + log.info("saved snapshot into {}, time cost {} ms", snapshotPath, + System.currentTimeMillis() - startTime); + } + + private boolean verifySnapshot(String snapshotPath) { + try { + try (final Options options = new Options(); + final ColumnFamilyOptions cfOptions = new ColumnFamilyOptions()) { + List columnFamilyDescriptorList = new ArrayList<>(); + List columnFamilyBytes = RocksDB.listColumnFamilies(options, snapshotPath); + + if (columnFamilyBytes.size() > 0) { + for (byte[] columnFamilyByte : columnFamilyBytes) { + columnFamilyDescriptorList.add( + new ColumnFamilyDescriptor(columnFamilyByte, cfOptions)); + } + } else { + columnFamilyDescriptorList.add( + new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, cfOptions)); + } + List columnFamilyHandleList = new ArrayList<>(); + + try (final DBOptions dbOptions = new DBOptions(); + final RocksDB db = RocksDB.openReadOnly(dbOptions, snapshotPath, + columnFamilyDescriptorList, + columnFamilyHandleList)) { + for (ColumnFamilyHandle handle : columnFamilyHandleList) { + if (handle == null) { + log.error("verifySnapshot some ColumnFamilyHandle is null in {}", + snapshotPath); + return false; + } + } + } + } + log.info("verifySnapshot {} Ok", snapshotPath); + } catch (RocksDBException e) { + log.error("verifySnapshot {} failed. {}", snapshotPath, e.toString()); + return false; + } + return true; + } + + public void loadSnapshot(String snapshotPath, long v1) throws DBStoreException { + long startTime = System.currentTimeMillis(); + log.info("begin load snapshot from {}", snapshotPath); + cfHandleLock.writeLock().lock(); + try { + final File snapshotFile = new File(snapshotPath); + if (!snapshotFile.exists()) { + log.error("Snapshot file {} not exists.", snapshotPath); + throw new DBStoreException( + String.format("Snapshot file %s not exists", snapshotPath)); + } + + // replace rocksdb data with snapshot data + File dbFile = new File(this.dbPath); + String parent = dbFile.getParent(); + String defaultName = dbFile.getName().split("_")[0]; + String defaultPath = Paths.get(parent, defaultName).toString(); + String newDBPath = String.format("%s_%d", defaultPath, v1); + String tempDBPath = newDBPath + tempSuffix; + + // first link snapshot to temp dir + try { + final File tempFile = new File(tempDBPath); + FileUtils.deleteDirectory(tempFile); + FileUtils.forceMkdir(tempFile); + File[] fs = snapshotFile.listFiles(); + for (File f : fs) { + if (!f.isDirectory()) { + File target = Paths.get(tempFile.getPath(), f.getName()).toFile(); + // create hard link + try { + Files.createLink(target.toPath(), f.toPath()); + } catch (IOException e) { + log.error("link failed, {} -> {}, error:{}", + f.getAbsolutePath(), + target.getAbsolutePath(), + e.getMessage()); + // diff disk + Files.copy(f.toPath(), target.toPath()); + } + } + } + } catch (IOException e) { + log.error("Fail to copy {} to {}. {}", snapshotPath, tempDBPath, e.toString()); + try { + FileUtils.deleteDirectory(new File(tempDBPath)); + } catch (IOException e2) { + log.error("Fail to delete directory {}. {}", tempDBPath, e2.toString()); + } + throw new DBStoreException( + String.format("Fail to copy %s to %s", snapshotPath, tempDBPath)); + } + // verify temp path + if (!verifySnapshot(tempDBPath)) { + throw new DBStoreException( + String.format("failed to verify snapshot %s", tempDBPath)); + } + + // move temp to newDBPath + try { + FileUtils.deleteDirectory(new File(newDBPath)); + FileUtils.moveDirectory(new File(tempDBPath), new File(newDBPath)); + } catch (IOException e) { + log.error("Fail to copy {} to {}. {}", snapshotPath, tempDBPath, e.toString()); + throw new DBStoreException( + String.format("Fail to move %s to %s", tempDBPath, newDBPath)); + } + } catch (final DBStoreException e) { + throw e; + } catch (final Exception e) { + log.error("failed to load snapshot from {}", snapshotPath); + throw new DBStoreException( + String.format("Fail to write snapshot at path %s", snapshotPath)); + } finally { + cfHandleLock.writeLock().unlock(); + } + log.info("loaded snapshot from {}, time cost {} ms", snapshotPath, + System.currentTimeMillis() - startTime); + } + + /** + * @param : + * @return the approximate size of the data. + */ + public long getApproximateDataSize() { + return getApproximateDataSize("\0".getBytes(StandardCharsets.UTF_8), + "\255".getBytes(StandardCharsets.UTF_8)); + } + + public long getEstimateNumKeys() { + cfHandleLock.readLock().lock(); + try { + long totalCount = 0; + for (ColumnFamilyHandle h : this.tables.values()) { + long count = this.rocksDB.getLongProperty(h, "rocksdb.estimate-num-keys"); + totalCount += count; + } + return totalCount; + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.readLock().unlock(); + } + } + + /** + * @param : + * @return the approximate size of the data. N KB. + */ + public long getApproximateDataSize(byte[] start, byte[] end) { + cfHandleLock.readLock().lock(); + try { + long kbSize = 0; + long bytesSize = 0; + Range r1 = new Range(new Slice(start), new Slice(end)); + for (ColumnFamilyHandle h : this.tables.values()) { + long[] sizes = this.rocksDB.getApproximateSizes( + h, + List.of(r1), + SizeApproximationFlag.INCLUDE_FILES, + SizeApproximationFlag.INCLUDE_MEMTABLES); + + bytesSize += sizes[0]; + kbSize += bytesSize / 1024; + bytesSize = bytesSize % 1024; + } + if (bytesSize != 0) { + kbSize += 1; + } + return kbSize; + } finally { + cfHandleLock.readLock().unlock(); + } + } + + public Map getApproximateCFDataSize(byte[] start, byte[] end) { + Map map = new ConcurrentHashMap<>(this.tables.size()); + cfHandleLock.readLock().lock(); + try { + Range r1 = new Range(new Slice(start), new Slice(end)); + for (ColumnFamilyHandle h : this.tables.values()) { + long[] sizes = this.rocksDB.getApproximateSizes( + h, + List.of(r1), + SizeApproximationFlag.INCLUDE_FILES, + SizeApproximationFlag.INCLUDE_MEMTABLES); + String name = new String(h.getDescriptor().getName()); + String size = FileUtils.byteCountToDisplaySize(sizes[0]); + map.put(name, size); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.readLock().unlock(); + } + return map; + } + + public Map getKeyCountPerCF(byte[] start, byte[] end, boolean accurate) throws + DBStoreException { + ConcurrentHashMap map = new ConcurrentHashMap<>(this.tables.size()); + cfHandleLock.readLock().lock(); + try { + SessionOperator op = this.sessionOp(); + for (ColumnFamilyHandle h : this.tables.values()) { + String name = new String(h.getDescriptor().getName()); + long count = 0; + if (accurate) { + count = op.keyCount(start, end, name); + } else { + count = op.estimatedKeyCount(name); + } + map.put(name, count); + } + } catch (RocksDBException e) { + throw new DBStoreException(e); + } finally { + cfHandleLock.readLock().unlock(); + } + return map; + } + + public String getDbPath() { + return this.dbPath; + } + + public void ingestSstFile(Map> sstFiles) { + long startTime = System.currentTimeMillis(); + log.info("begin ingestSstFile. graphName {}", this.graphName); + try { + for (Map.Entry> entry : sstFiles.entrySet()) { + String cfName = new String(entry.getKey()); + try (CFHandleLock cfHandle = this.getCFHandleLock(cfName)) { + try (final IngestExternalFileOptions ingestOptions = + new IngestExternalFileOptions() + .setMoveFiles(true)) { + this.rocksDB.ingestExternalFile(cfHandle.get(), entry.getValue(), + ingestOptions); + log.info("Rocksdb {} ingestSstFile cf:{}, sst: {}", this.graphName, cfName, + entry.getValue()); + } + } + } + } catch (RocksDBException e) { + throw new DBStoreException("Rocksdb ingestSstFile error " + this.graphName, e); + } + log.info("end ingestSstFile. graphName {}, time cost {} ms", this.graphName, + System.currentTimeMillis() - startTime); + } + + public String getProperty(String property) { + try { + return rocksDB.getProperty(property); + } catch (RocksDBException e) { + log.error("getProperty exception {}", e.getMessage()); + return "0"; + } + } + + /** + * @return true when all iterators were closed. + */ + public int getRefCount() { + return this.refCount.get(); + } + + public void forceResetRefCount() { + this.refCount.set(0); + } + + @Override + public synchronized void close() { + if (closed) { + log.warn("RocksDBSession has been closed!"); + return; + } + try { + if (this.refCount.decrementAndGet() > 0) { + return; + } + } finally { + closed = true; + } + + assert this.refCount.get() == 0; + this.shutdown(); + } + + RefCounter getRefCounter() { + return new RefCounter(this.refCount); + } + + public static class CFHandleLock implements Closeable { + + private final ColumnFamilyHandle handle; + private final ReentrantReadWriteLock lock; + + public CFHandleLock(ColumnFamilyHandle handle, ReentrantReadWriteLock lock) { + this.handle = handle; + this.lock = lock; + } + + @Override + public void close() { + lock.readLock().unlock(); + } + + public ColumnFamilyHandle get() { + return handle; + } + } + + /** + * A wrapper for RocksIterator that convert RocksDB results to std Iterator + */ + + public static class BackendColumn implements Comparable { + + public byte[] name; + public byte[] value; + + public static BackendColumn of(byte[] name, byte[] value) { + BackendColumn col = new BackendColumn(); + col.name = name; + col.value = value; + return col; + } + + @Override + public String toString() { + return String.format("%s=%s", + new String(name, StandardCharsets.UTF_8), + new String(value, StandardCharsets.UTF_8)); + } + + @Override + public int compareTo(BackendColumn other) { + if (other == null) { + return 1; + } + return Bytes.compare(this.name, other.name); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof BackendColumn)) { + return false; + } + BackendColumn other = (BackendColumn) obj; + return Bytes.equals(this.name, other.name) && + Bytes.equals(this.value, other.value); + } + } + + class RefCounter { + + final AtomicInteger refCount; + + RefCounter(AtomicInteger refCount) { + (this.refCount = refCount).incrementAndGet(); + } + + public void release() { + if (0 == this.refCount.decrementAndGet()) { + shutdown(); + } + } + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/ScanIterator.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/ScanIterator.java new file mode 100644 index 0000000000..eb0c0ef145 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/ScanIterator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.io.Closeable; + +public interface ScanIterator extends Closeable { + + boolean hasNext(); + + boolean isValid(); + + T next(); + + default long count() { + return 0; + } + + default byte[] position() { + return new byte[0]; + } + + default void seek(byte[] position) { + } + + @Override + void close(); + + abstract class Trait { + + public static final int SCAN_ANY = 0x80; + public static final int SCAN_PREFIX_BEGIN = 0x01; + public static final int SCAN_PREFIX_END = 0x02; + public static final int SCAN_GT_BEGIN = 0x04; + public static final int SCAN_GTE_BEGIN = 0x0c; + public static final int SCAN_LT_END = 0x10; + public static final int SCAN_LTE_END = 0x30; + public static final int SCAN_KEYONLY = 0x40; + public static final int SCAN_HASHCODE = 0x100; + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperator.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperator.java new file mode 100644 index 0000000000..12c0d3759f --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import org.apache.hugegraph.store.term.HgPair; + +public interface SessionOperator { + + HgPair keyRange(String table); + + void compactRange(String table) throws DBStoreException; + + void compactRange() throws DBStoreException; + + void put(String table, byte[] key, byte[] value) throws DBStoreException; + + ScanIterator scan(String tableName); + + ScanIterator scan(String tableName, byte[] prefix); + + ScanIterator scan(String tableName, byte[] prefix, int scanType); + + ScanIterator scan(String tableName, byte[] keyFrom, byte[] keyTo, int scanType); + + /** + * 扫描所有cf指定范围的数据 + */ + ScanIterator scanRaw(byte[] keyFrom, byte[] keyTo, long startSeqNum); + + long keyCount(byte[] start, byte[] end, String tableName); + + long estimatedKeyCount(String tableName); + + /* + * only support 'long data' operator + * */ + void merge(String table, byte[] key, byte[] value) throws DBStoreException; + + void increase(String table, byte[] key, byte[] value) throws DBStoreException; + + void delete(String table, byte[] key) throws DBStoreException; + + void deleteSingle(String table, byte[] key) throws DBStoreException; + + void deletePrefix(String table, byte[] key) throws DBStoreException; + + void deleteRange(String table, byte[] keyFrom, byte[] keyTo) throws DBStoreException; + + /** + * 删除所有cf指定范围的数据 + */ + void deleteRange(byte[] keyFrom, byte[] keyTo) throws DBStoreException; + + byte[] get(String table, byte[] key) throws DBStoreException; + + void prepare(); + + Integer commit() throws DBStoreException; + + void rollback(); + + RocksDBSession getDBSession(); +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java new file mode 100644 index 0000000000..f4f8987151 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/SessionOperatorImpl.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Iterator; + +import org.apache.hugegraph.rocksdb.access.RocksDBSession.CFHandleLock; +import org.apache.hugegraph.rocksdb.access.util.Asserts; +import org.apache.hugegraph.store.term.HgPair; +import org.apache.hugegraph.util.Bytes; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Slice; +import org.rocksdb.Snapshot; +import org.rocksdb.WriteBatch; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SessionOperatorImpl implements SessionOperator { + + private final RocksDB db; + private final RocksDBSession session; + private WriteBatch batch; + + public SessionOperatorImpl(RocksDBSession session) { + this.session = session; + this.db = session.getDB(); + } + + public static final byte[] increaseOne(byte[] bytes) { + final byte BYTE_MAX_VALUE = (byte) 0xff; + assert bytes.length > 0; + byte last = bytes[bytes.length - 1]; + if (last != BYTE_MAX_VALUE) { + bytes[bytes.length - 1] += (byte) 0x01; + } else { + // Process overflow (like [1, 255] => [2, 0]) + int i = bytes.length - 1; + for (; i > 0 && bytes[i] == BYTE_MAX_VALUE; --i) { + bytes[i] += (byte) 0x01; + } + if (bytes[i] == BYTE_MAX_VALUE) { + assert i == 0; + throw new DBStoreException("Unable to increase bytes: %s", Bytes.toHex(bytes)); + } + bytes[i] += (byte) 0x01; + } + return bytes; + } + + public RocksDB rocksdb() { + return db; + } + + @Override + public RocksDBSession getDBSession() { + return session; + } + + private CFHandleLock getLock(String table) { + CFHandleLock cf = this.session.getCFHandleLock(table); + return cf; + } + + @Override + public HgPair keyRange(String table) { + byte[] startKey, endKey; + try (CFHandleLock handle = this.getLock(table); + RocksIterator iter = rocksdb().newIterator(handle.get())) { + iter.seekToFirst(); + if (!iter.isValid()) { + return null; + } + startKey = iter.key(); + iter.seekToLast(); + if (!iter.isValid()) { + return new HgPair<>(startKey, null); + } + endKey = iter.key(); + } + return new HgPair<>(startKey, endKey); + } + + @Override + public void compactRange(String table) throws DBStoreException { + try (CFHandleLock handle = this.getLock(table)) { + rocksdb().compactRange(handle.get()); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void compactRange() throws DBStoreException { + for (String name : session.getTables().keySet()) { + compactRange(name); + } + } + + @Override + public void put(String table, byte[] key, byte[] value) throws DBStoreException { + try { + this.getBatch().put(session.getCF(table), key, value); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + /* + * only support 'long data' operator + */ + @Override + public void merge(String table, byte[] key, byte[] value) throws DBStoreException { + try { + this.getBatch().merge(session.getCF(table), key, value); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void increase(String table, byte[] key, byte[] value) throws DBStoreException { + try (CFHandleLock cf = this.getLock(table)) { + rocksdb().merge(cf.get(), key, value); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void delete(String table, byte[] key) throws DBStoreException { + try { + this.getBatch().delete(session.getCF(table), key); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void deleteSingle(String table, byte[] key) throws DBStoreException { + try { + this.getBatch().singleDelete(session.getCF(table), key); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void deletePrefix(String table, byte[] key) throws DBStoreException { + byte[] keyFrom = key; + byte[] keyTo = Arrays.copyOf(key, key.length); + keyTo = increaseOne(keyTo); + try { + this.getBatch().deleteRange(session.getCF(table), keyFrom, keyTo); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void deleteRange(String table, byte[] keyFrom, byte[] keyTo) throws DBStoreException { + Asserts.isTrue(keyFrom != null, "KeyFrom is null"); + Asserts.isTrue(keyTo != null, "KeyTo is null"); + + if (Bytes.compare(keyTo, keyFrom) < 0) { + throw new DBStoreException("[end key: %s ] is lower than [start key: %s]", + Arrays.toString(keyTo), Arrays.toString(keyFrom)); + } + + try { + this.prepare(); + this.getBatch().deleteRange(session.getCF(table), keyFrom, keyTo); + this.commit(); + } catch (RocksDBException e) { + this.rollback(); + throw new DBStoreException(e); + } + } + + @Override + public void deleteRange(byte[] keyFrom, byte[] keyTo) throws DBStoreException { + for (String name : session.getTables().keySet()) { + deleteRange(name, keyFrom, keyTo); + } + } + + @Override + public byte[] get(String table, byte[] key) throws DBStoreException { + try (CFHandleLock cf = this.getLock(table)) { + return rocksdb().get(cf.get(), key); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + @Override + public void prepare() { + this.session.getCfHandleReadLock().lock(); + } + + /** + * commit抛出异常后一定要调用rollback,否则会造成cfHandleReadLock未释放 + */ + @Override + public Integer commit() throws DBStoreException { + int count = this.getBatch().count(); + if (count > 0) { + try { + rocksdb().write(session.getWriteOptions(), this.batch); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + // Clear batch if write() successfully (retained if failed) + this.batch.clear(); + this.batch.close(); + this.batch = null; + } + this.session.getCfHandleReadLock().unlock(); + return count; + } + + @Override + public void rollback() { + try { + if (this.batch != null) { + this.batch.clear(); + this.batch.close(); + } + this.batch = null; + } finally { + try { + this.session.getCfHandleReadLock().unlock(); + } catch (Exception e) { + log.error("rollback {}", e); + } + } + } + + @Override + public ScanIterator scan(String tableName) { + try (CFHandleLock handle = this.getLock(tableName)) { + if (handle == null) { + log.info("no find table : {}", tableName); + return null; + } + return new RocksDBScanIterator(this.rocksdb().newIterator(handle.get()), null, null, + ScanIterator.Trait.SCAN_ANY, + this.session.getRefCounter()); + } + } + + @Override + public ScanIterator scan(String tableName, byte[] prefix) { + return scan(tableName, prefix, 0); + } + + @Override + public ScanIterator scan(String tableName, byte[] prefix, int scanType) { + try (CFHandleLock handle = this.getLock(tableName)) { + if (handle == null) { + log.info("no find table: {} for scanning with prefix: {}", tableName, + new String(prefix)); + return null; + } + return new RocksDBScanIterator(this.rocksdb().newIterator(handle.get()), prefix, null, + ScanIterator.Trait.SCAN_PREFIX_BEGIN | scanType, + this.session.getRefCounter()); + } + } + + @Override + public ScanIterator scan(String tableName, byte[] keyFrom, byte[] keyTo, int scanType) { + try (CFHandleLock handle = this.getLock(tableName)) { + if (handle == null) { + log.info("no find table: {} for scantype: {}", tableName, scanType); + return null; + } + return new RocksDBScanIterator(this.rocksdb().newIterator(handle.get()), keyFrom, keyTo, + scanType, + this.session.getRefCounter()); + } + } + + /** + * 遍历所有cf指定范围的数据 + * TODO: rocksdb7.x 不支持 setStartSeqNum,改为使用 Timestamp + * refer: https://github.com/facebook/rocksdb/wiki/User-defined-Timestamp + */ + @Override + public ScanIterator scanRaw(byte[] keyFrom, byte[] keyTo, long startSeqNum) { + int kNumInternalBytes = 8; //internal key 增加的8个字节后缀 + Snapshot snapshot = rocksdb().getSnapshot(); + Iterator cfIterator = session.getTables().keySet().iterator(); + + return new ScanIterator() { + String cfName = null; + + @Override + public boolean hasNext() { + return cfIterator.hasNext(); + } + + @Override + public boolean isValid() { + return cfIterator.hasNext(); + } + + @Override + public T next() { + RocksIterator iterator = null; + ReadOptions readOptions = new ReadOptions() + .setSnapshot(snapshot); + if (keyFrom != null) { + readOptions.setIterateLowerBound(new Slice(keyFrom)); + } + if (keyTo != null) { + readOptions.setIterateUpperBound(new Slice(keyTo)); + } + while (iterator == null && cfIterator.hasNext()) { + cfName = cfIterator.next(); + try (CFHandleLock handle = getLock(cfName)) { + iterator = rocksdb().newIterator(handle.get(), readOptions); + iterator.seekToFirst(); + } + } + if (iterator == null) { + return null; + } + RocksIterator finalIterator = iterator; + return (T) new ScanIterator() { + private final ReadOptions holdReadOptions = readOptions; + + @Override + public boolean hasNext() { + return finalIterator.isValid(); + } + + @Override + public boolean isValid() { + return finalIterator.isValid(); + } + + @Override + public T next() { + byte[] key = finalIterator.key(); + if (startSeqNum > 0) { + key = Arrays.copyOfRange(key, 0, key.length - kNumInternalBytes); + } + RocksDBSession.BackendColumn col = + RocksDBSession.BackendColumn.of(key, finalIterator.value()); + finalIterator.next(); + return (T) col; + } + + @Override + public void close() { + finalIterator.close(); + holdReadOptions.close(); + } + + }; + } + + @Override + public void close() { + rocksdb().releaseSnapshot(snapshot); + } + + @Override + public byte[] position() { + return cfName.getBytes(StandardCharsets.UTF_8); + + } + }; + } + + @Override + public long keyCount(byte[] start, byte[] end, String tableName) { + ScanIterator it = scan(tableName, start, end, ScanIterator.Trait.SCAN_LT_END); + return it.count(); + } + + @Override + public long estimatedKeyCount(String tableName) throws DBStoreException { + try { + return this.rocksdb() + .getLongProperty(session.getCF(tableName), "rocksdb.estimate-num-keys"); + } catch (RocksDBException e) { + throw new DBStoreException(e); + } + } + + private WriteBatch getBatch() { + if (this.batch == null) { + this.batch = new WriteBatch(); + } + return this.batch; + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/Asserts.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/Asserts.java new file mode 100644 index 0000000000..f77791dd1b --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/Asserts.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access.util; + +import java.util.function.Supplier; + +public final class Asserts { + + public static void isTrue(boolean expression, String message) { + if (message == null) { + throw new IllegalArgumentException("message is null"); + } + if (!expression) { + throw new IllegalArgumentException(message); + } + } + + public static void isTrue(boolean expression, Supplier s) { + if (s == null) { + throw new IllegalArgumentException("Supplier is null"); + } + if (!expression) { + throw s.get(); + } + } + +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/CRC64.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/CRC64.java new file mode 100644 index 0000000000..db39ab697c --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/CRC64.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access.util; + +import java.util.zip.Checksum; + +public class CRC64 implements Checksum { + + private static final long[] CRC_TABLE = new long[]{0x0000000000000000L, 0x42F0E1EBA9EA3693L, + 0x85E1C3D753D46D26L, 0xC711223CFA3E5BB5L, + 0x493366450E42ECDFL, 0x0BC387AEA7A8DA4CL, + 0xCCD2A5925D9681F9L, 0x8E224479F47CB76AL, + 0x9266CC8A1C85D9BEL, 0xD0962D61B56FEF2DL, + 0x17870F5D4F51B498L, 0x5577EEB6E6BB820BL, + 0xDB55AACF12C73561L, 0x99A54B24BB2D03F2L, + 0x5EB4691841135847L, 0x1C4488F3E8F96ED4L, + 0x663D78FF90E185EFL, 0x24CD9914390BB37CL, + 0xE3DCBB28C335E8C9L, 0xA12C5AC36ADFDE5AL, + 0x2F0E1EBA9EA36930L, 0x6DFEFF5137495FA3L, + 0xAAEFDD6DCD770416L, 0xE81F3C86649D3285L, + 0xF45BB4758C645C51L, 0xB6AB559E258E6AC2L, + 0x71BA77A2DFB03177L, 0x334A9649765A07E4L, + 0xBD68D2308226B08EL, 0xFF9833DB2BCC861DL, + 0x388911E7D1F2DDA8L, 0x7A79F00C7818EB3BL, + 0xCC7AF1FF21C30BDEL, 0x8E8A101488293D4DL, + 0x499B3228721766F8L, 0x0B6BD3C3DBFD506BL, + 0x854997BA2F81E701L, 0xC7B97651866BD192L, + 0x00A8546D7C558A27L, 0x4258B586D5BFBCB4L, + 0x5E1C3D753D46D260L, 0x1CECDC9E94ACE4F3L, + 0xDBFDFEA26E92BF46L, 0x990D1F49C77889D5L, + 0x172F5B3033043EBFL, 0x55DFBADB9AEE082CL, + 0x92CE98E760D05399L, 0xD03E790CC93A650AL, + 0xAA478900B1228E31L, 0xE8B768EB18C8B8A2L, + 0x2FA64AD7E2F6E317L, 0x6D56AB3C4B1CD584L, + 0xE374EF45BF6062EEL, 0xA1840EAE168A547DL, + 0x66952C92ECB40FC8L, 0x2465CD79455E395BL, + 0x3821458AADA7578FL, 0x7AD1A461044D611CL, + 0xBDC0865DFE733AA9L, 0xFF3067B657990C3AL, + 0x711223CFA3E5BB50L, 0x33E2C2240A0F8DC3L, + 0xF4F3E018F031D676L, 0xB60301F359DBE0E5L, + 0xDA050215EA6C212FL, 0x98F5E3FE438617BCL, + 0x5FE4C1C2B9B84C09L, 0x1D14202910527A9AL, + 0x93366450E42ECDF0L, 0xD1C685BB4DC4FB63L, + 0x16D7A787B7FAA0D6L, 0x5427466C1E109645L, + 0x4863CE9FF6E9F891L, 0x0A932F745F03CE02L, + 0xCD820D48A53D95B7L, 0x8F72ECA30CD7A324L, + 0x0150A8DAF8AB144EL, 0x43A04931514122DDL, + 0x84B16B0DAB7F7968L, 0xC6418AE602954FFBL, + 0xBC387AEA7A8DA4C0L, 0xFEC89B01D3679253L, + 0x39D9B93D2959C9E6L, 0x7B2958D680B3FF75L, + 0xF50B1CAF74CF481FL, 0xB7FBFD44DD257E8CL, + 0x70EADF78271B2539L, 0x321A3E938EF113AAL, + 0x2E5EB66066087D7EL, 0x6CAE578BCFE24BEDL, + 0xABBF75B735DC1058L, 0xE94F945C9C3626CBL, + 0x676DD025684A91A1L, 0x259D31CEC1A0A732L, + 0xE28C13F23B9EFC87L, 0xA07CF2199274CA14L, + 0x167FF3EACBAF2AF1L, 0x548F120162451C62L, + 0x939E303D987B47D7L, 0xD16ED1D631917144L, + 0x5F4C95AFC5EDC62EL, 0x1DBC74446C07F0BDL, + 0xDAAD56789639AB08L, 0x985DB7933FD39D9BL, + 0x84193F60D72AF34FL, 0xC6E9DE8B7EC0C5DCL, + 0x01F8FCB784FE9E69L, 0x43081D5C2D14A8FAL, + 0xCD2A5925D9681F90L, 0x8FDAB8CE70822903L, + 0x48CB9AF28ABC72B6L, 0x0A3B7B1923564425L, + 0x70428B155B4EAF1EL, 0x32B26AFEF2A4998DL, + 0xF5A348C2089AC238L, 0xB753A929A170F4ABL, + 0x3971ED50550C43C1L, 0x7B810CBBFCE67552L, + 0xBC902E8706D82EE7L, 0xFE60CF6CAF321874L, + 0xE224479F47CB76A0L, 0xA0D4A674EE214033L, + 0x67C58448141F1B86L, 0x253565A3BDF52D15L, + 0xAB1721DA49899A7FL, 0xE9E7C031E063ACECL, + 0x2EF6E20D1A5DF759L, 0x6C0603E6B3B7C1CAL, + 0xF6FAE5C07D3274CDL, 0xB40A042BD4D8425EL, + 0x731B26172EE619EBL, 0x31EBC7FC870C2F78L, + 0xBFC9838573709812L, 0xFD39626EDA9AAE81L, + 0x3A28405220A4F534L, 0x78D8A1B9894EC3A7L, + 0x649C294A61B7AD73L, 0x266CC8A1C85D9BE0L, + 0xE17DEA9D3263C055L, 0xA38D0B769B89F6C6L, + 0x2DAF4F0F6FF541ACL, 0x6F5FAEE4C61F773FL, + 0xA84E8CD83C212C8AL, 0xEABE6D3395CB1A19L, + 0x90C79D3FEDD3F122L, 0xD2377CD44439C7B1L, + 0x15265EE8BE079C04L, 0x57D6BF0317EDAA97L, + 0xD9F4FB7AE3911DFDL, 0x9B041A914A7B2B6EL, + 0x5C1538ADB04570DBL, 0x1EE5D94619AF4648L, + 0x02A151B5F156289CL, 0x4051B05E58BC1E0FL, + 0x87409262A28245BAL, 0xC5B073890B687329L, + 0x4B9237F0FF14C443L, 0x0962D61B56FEF2D0L, + 0xCE73F427ACC0A965L, 0x8C8315CC052A9FF6L, + 0x3A80143F5CF17F13L, 0x7870F5D4F51B4980L, + 0xBF61D7E80F251235L, 0xFD913603A6CF24A6L, + 0x73B3727A52B393CCL, 0x31439391FB59A55FL, + 0xF652B1AD0167FEEAL, 0xB4A25046A88DC879L, + 0xA8E6D8B54074A6ADL, 0xEA16395EE99E903EL, + 0x2D071B6213A0CB8BL, 0x6FF7FA89BA4AFD18L, + 0xE1D5BEF04E364A72L, 0xA3255F1BE7DC7CE1L, + 0x64347D271DE22754L, 0x26C49CCCB40811C7L, + 0x5CBD6CC0CC10FAFCL, 0x1E4D8D2B65FACC6FL, + 0xD95CAF179FC497DAL, 0x9BAC4EFC362EA149L, + 0x158E0A85C2521623L, 0x577EEB6E6BB820B0L, + 0x906FC95291867B05L, 0xD29F28B9386C4D96L, + 0xCEDBA04AD0952342L, 0x8C2B41A1797F15D1L, + 0x4B3A639D83414E64L, 0x09CA82762AAB78F7L, + 0x87E8C60FDED7CF9DL, 0xC51827E4773DF90EL, + 0x020905D88D03A2BBL, 0x40F9E43324E99428L, + 0x2CFFE7D5975E55E2L, 0x6E0F063E3EB46371L, + 0xA91E2402C48A38C4L, 0xEBEEC5E96D600E57L, + 0x65CC8190991CB93DL, 0x273C607B30F68FAEL, + 0xE02D4247CAC8D41BL, 0xA2DDA3AC6322E288L, + 0xBE992B5F8BDB8C5CL, 0xFC69CAB42231BACFL, + 0x3B78E888D80FE17AL, 0x7988096371E5D7E9L, + 0xF7AA4D1A85996083L, 0xB55AACF12C735610L, + 0x724B8ECDD64D0DA5L, 0x30BB6F267FA73B36L, + 0x4AC29F2A07BFD00DL, 0x08327EC1AE55E69EL, + 0xCF235CFD546BBD2BL, 0x8DD3BD16FD818BB8L, + 0x03F1F96F09FD3CD2L, 0x41011884A0170A41L, + 0x86103AB85A2951F4L, 0xC4E0DB53F3C36767L, + 0xD8A453A01B3A09B3L, 0x9A54B24BB2D03F20L, + 0x5D45907748EE6495L, 0x1FB5719CE1045206L, + 0x919735E51578E56CL, 0xD367D40EBC92D3FFL, + 0x1476F63246AC884AL, 0x568617D9EF46BED9L, + 0xE085162AB69D5E3CL, 0xA275F7C11F7768AFL, + 0x6564D5FDE549331AL, 0x279434164CA30589L, + 0xA9B6706FB8DFB2E3L, 0xEB46918411358470L, + 0x2C57B3B8EB0BDFC5L, 0x6EA7525342E1E956L, + 0x72E3DAA0AA188782L, 0x30133B4B03F2B111L, + 0xF7021977F9CCEAA4L, 0xB5F2F89C5026DC37L, + 0x3BD0BCE5A45A6B5DL, 0x79205D0E0DB05DCEL, + 0xBE317F32F78E067BL, 0xFCC19ED95E6430E8L, + 0x86B86ED5267CDBD3L, 0xC4488F3E8F96ED40L, + 0x0359AD0275A8B6F5L, 0x41A94CE9DC428066L, + 0xCF8B0890283E370CL, 0x8D7BE97B81D4019FL, + 0x4A6ACB477BEA5A2AL, 0x089A2AACD2006CB9L, + 0x14DEA25F3AF9026DL, 0x562E43B4931334FEL, + 0x913F6188692D6F4BL, 0xD3CF8063C0C759D8L, + 0x5DEDC41A34BBEEB2L, 0x1F1D25F19D51D821L, + 0xD80C07CD676F8394L, 0x9AFCE626CE85B507L}; + private long crc = 0; + + @Override + public void update(final int b) { + update((byte) (b & 0xFF)); + } + + public void update(final byte b) { + final int tab_index = ((int) (this.crc >> 56) ^ b) & 0xFF; + this.crc = CRC_TABLE[tab_index] ^ (this.crc << 8); + } + + @Override + public void update(final byte[] buffer, final int offset, int length) { + for (int i = offset; length > 0; length--) { + update(buffer[i++]); + } + } + + @Override + public void update(final byte[] buffer) { + for (int i = 0; i < buffer.length; i++) { + update(buffer[i]); + } + } + + @Override + public long getValue() { + return this.crc; + } + + @Override + public void reset() { + this.crc = 0; + } + +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/ZipUtils.java b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/ZipUtils.java new file mode 100644 index 0000000000..f243bd2847 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/main/java/org/apache/hugegraph/rocksdb/access/util/ZipUtils.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; +import java.util.zip.Checksum; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.NullOutputStream; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class ZipUtils { + + public static void compress(final String rootDir, final String sourceDir, + final String outputFile, final Checksum checksum) throws + IOException { + try (final FileOutputStream fos = new FileOutputStream(outputFile); + final CheckedOutputStream cos = new CheckedOutputStream(fos, checksum); + final ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(cos))) { + ZipUtils.compressDirectoryToZipFile(rootDir, sourceDir, zos); + zos.flush(); + fos.getFD().sync(); + } + } + + private static void compressDirectoryToZipFile(final String rootDir, final String sourceDir, + final ZipOutputStream zos) throws IOException { + final String dir = Paths.get(rootDir, sourceDir).toString(); + final File[] files = new File(dir).listFiles(); + for (final File file : files) { + final String child = Paths.get(sourceDir, file.getName()).toString(); + if (file.isDirectory()) { + compressDirectoryToZipFile(rootDir, child, zos); + } else { + zos.putNextEntry(new ZipEntry(child)); + try (final FileInputStream fis = new FileInputStream(file); + final BufferedInputStream bis = new BufferedInputStream(fis)) { + IOUtils.copy(bis, zos); + } + } + } + } + + public static void decompress(final String sourceFile, final String outputDir, + final Checksum checksum) throws IOException { + try (final FileInputStream fis = new FileInputStream(sourceFile); + final CheckedInputStream cis = new CheckedInputStream(fis, checksum); + final ZipInputStream zis = new ZipInputStream(new BufferedInputStream(cis))) { + ZipEntry entry; + while ((entry = zis.getNextEntry()) != null) { + final String fileName = entry.getName(); + final Path entryPath = Paths.get(outputDir).resolve(fileName).normalize(); + if (!entryPath.startsWith(Paths.get(outputDir).normalize())) { + // The file path is not in the expected directory. There may be a Zip Slip + // vulnerability. Ignore it or handle it accordingly. + continue; + } + final File entryFile = entryPath.toFile(); + FileUtils.forceMkdir(entryFile.getParentFile()); + try (final FileOutputStream fos = new FileOutputStream(entryFile); + final BufferedOutputStream bos = new BufferedOutputStream(fos)) { + IOUtils.copy(zis, bos); + bos.flush(); + fos.getFD().sync(); + } + } + IOUtils.copy(cis, NullOutputStream.NULL_OUTPUT_STREAM); + } + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/main/java/place-holder.txt b/hugegraph-store/hg-store-rocksdb/src/main/java/place-holder.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBFactoryTest.java b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBFactoryTest.java new file mode 100644 index 0000000000..3e2b674e09 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBFactoryTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.configuration2.MapConfiguration; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.config.OptionSpace; +import org.junit.BeforeClass; + +public class RocksDBFactoryTest { + + @BeforeClass + public static void init() { + OptionSpace.register("rocksdb", + "org.apache.hugegraph.rocksdb.access.RocksDBOptions"); + RocksDBOptions.instance(); + + Map configMap = new HashMap<>(); + configMap.put("rocksdb.write_buffer_size", "1048576"); + configMap.put("rocksdb.bloom_filter_bits_per_key", "10"); + + HugeConfig hConfig = new HugeConfig(new MapConfiguration(configMap)); + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + rFactory.setHugeConfig(hConfig); + } + + // @Test + public void testCreateSession() throws InterruptedException { + RocksDBFactory factory = RocksDBFactory.getInstance(); + try (RocksDBSession dbSession = factory.createGraphDB("./tmp", "test1")) { + SessionOperator op = dbSession.sessionOp(); + op.prepare(); + try { + op.put("tbl", "k1".getBytes(), "v1".getBytes()); + op.commit(); + } catch (Exception e) { + op.rollback(); + } + + } + factory.destroyGraphDB("test1"); + + Thread.sleep(100000); + } + + // @Test + public void testTotalKeys() { + RocksDBFactory dbFactory = RocksDBFactory.getInstance(); + System.out.println(dbFactory.getTotalSize()); + + System.out.println(dbFactory.getTotalKey().entrySet() + .stream().map(e -> e.getValue()).reduce(0L, Long::sum)); + } + + // @Test + public void releaseAllGraphDB() { + System.out.println(RocksDBFactory.class); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + if (rFactory.queryGraphDB("bj01") == null) { + rFactory.createGraphDB("./tmp", "bj01"); + } + + if (rFactory.queryGraphDB("bj02") == null) { + rFactory.createGraphDB("./tmp", "bj02"); + } + + if (rFactory.queryGraphDB("bj03") == null) { + rFactory.createGraphDB("./tmp", "bj03"); + } + + RocksDBSession dbSession = rFactory.queryGraphDB("bj01"); + + dbSession.checkTable("test"); + SessionOperator sessionOp = dbSession.sessionOp(); + sessionOp.prepare(); + + sessionOp.put("test", "hi".getBytes(), "byebye".getBytes()); + sessionOp.commit(); + + rFactory.releaseAllGraphDB(); + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBSessionTest.java b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBSessionTest.java new file mode 100644 index 0000000000..3f80191e4c --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksDBSessionTest.java @@ -0,0 +1,474 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.configuration2.MapConfiguration; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.config.OptionSpace; +import org.junit.BeforeClass; + +public class RocksDBSessionTest { + + private final String graphName = "testDummy"; + + @BeforeClass + public static void init() { + OptionSpace.register("rocksdb", + "org.apache.hugegraph.rocksdb.access.RocksDBOptions"); + RocksDBOptions.instance(); + Map configMap = new HashMap<>(); + configMap.put("rocksdb.write_buffer_size", "1048576"); + configMap.put("rocksdb.bloom_filter_bits_per_key", "10"); + + HugeConfig hConfig = new HugeConfig(new MapConfiguration(configMap)); + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + rFactory.setHugeConfig(hConfig); + } + + private static byte[] intToBytesForPartId(int v) { + short s = (short) v; + ByteBuffer buffer = ByteBuffer.allocate(Short.BYTES).order(ByteOrder.BIG_ENDIAN); + buffer.putShort(s); + return buffer.array(); + } + + public static byte[] byteCompose(byte[] b1, byte[] b2) { + byte[] b3 = new byte[b1.length + b2.length]; + System.arraycopy(b1, 0, b3, 0, b1.length); + System.arraycopy(b2, 0, b3, b1.length, b2.length); + return b3; + } + + private static byte[] keyAppendPartId(int partId, byte[] key) { + byte[] partBytes = intToBytesForPartId(partId); + byte[] targetKey = byteCompose(partBytes, key); + return targetKey; + } + + private static byte[] getPartStartKey(int partId) { + return intToBytesForPartId(partId); + } + + private static byte[] getPartEndKey(int partId) { + return intToBytesForPartId(partId + 1); + } + + // @Test + public void put() { + System.out.println("RocksDBSessionTest::put test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t1"; + rSession.checkTable(tName); + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "a1".getBytes(), "f1".getBytes()); + sessionOp.commit(); + + String vRet = new String(rSession.sessionOp().get(tName, "a1".getBytes())); + assertEquals(vRet, "f1"); + } + + // @Test + public void selectTable() { + + System.out.println("RocksDBSessionTest::selectTable test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t2"; + rSession.checkTable(tName); + + assertTrue(rSession.tableIsExist(tName)); + + } + + // @Test + public void batchPut() { + System.out.println("RocksDBSessionTest::batchPut test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t3"; + String tName2 = "t4"; + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "a1".getBytes(), "f1".getBytes()); + sessionOp.put(tName, "a2".getBytes(), "f2".getBytes()); + sessionOp.put(tName, "a3".getBytes(), "f3".getBytes()); + sessionOp.put(tName, "a4".getBytes(), "f4".getBytes()); + sessionOp.put(tName, "a5".getBytes(), "f5".getBytes()); + sessionOp.put(tName, "a6".getBytes(), "f6".getBytes()); + sessionOp.commit(); + + sessionOp.put(tName2, "m1".getBytes(), "k1".getBytes()); + sessionOp.put(tName2, "m2".getBytes(), "k2".getBytes()); + sessionOp.put(tName2, "m3".getBytes(), "k3".getBytes()); + sessionOp.put(tName, "a7".getBytes(), "f7".getBytes()); + sessionOp.commit(); + + String vRet1 = new String(rSession.sessionOp().get(tName, "a1".getBytes())); + String vRet2 = new String(rSession.sessionOp().get(tName2, "m1".getBytes())); + String vRet3 = new String(rSession.sessionOp().get(tName, "a7".getBytes())); + + assertEquals(vRet1, "f1"); + assertEquals(vRet2, "k1"); + assertEquals(vRet3, "f7"); + + } + + // @Test + public void get() { + System.out.println("RocksDBSessionTest::get test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t5"; + rSession.checkTable(tName); + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "hash".getBytes(), "youareok".getBytes()); + sessionOp.commit(); + + String vRet = new String(rSession.sessionOp().get(tName, "hash".getBytes())); + assertEquals(vRet, "youareok"); + } + + // @Test + public void createTables() { + System.out.println("RocksDBSessionTest::createTables test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + rSession.createTables("fly1", "fly2", "fly3"); + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put("fly1", "cat1".getBytes(), "hit1".getBytes()); + sessionOp.put("fly2", "cat2".getBytes(), "hit2".getBytes()); + sessionOp.put("fly3", "cat3".getBytes(), "hit3".getBytes()); + sessionOp.commit(); + + assertTrue(rSession.tableIsExist("fly1")); + assertTrue(rSession.tableIsExist("fly2")); + assertTrue(rSession.tableIsExist("fly3")); + + } + + // @Test + public void dropTables() { + System.out.println("RocksDBSessionTest::dropTables test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + rSession.createTables("dummy1"); + + assertTrue(rSession.tableIsExist("dummy1")); + + rSession.dropTables("dummy1"); + + assertFalse(rSession.tableIsExist("dummy1")); + + } + + // @Test + public void scan() { + System.out.println("RocksDBSessionTest::scan test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + String graph1 = "tBatch"; + rFactory.createGraphDB("./tmp", graphName); + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graph1); + + String tName = "t6"; + rSession.checkTable(tName); + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "box1".getBytes(), "gift1".getBytes()); + sessionOp.put(tName, "box2".getBytes(), "gift2".getBytes()); + sessionOp.put(tName, "box3".getBytes(), "gift3".getBytes()); + sessionOp.put(tName, "box4".getBytes(), "gift4".getBytes()); + sessionOp.put(tName, "box5".getBytes(), "gift5".getBytes()); + sessionOp.put(tName, "box6".getBytes(), "gift6".getBytes()); + sessionOp.put(tName, "box7".getBytes(), "gift7".getBytes()); + sessionOp.put(tName, "box8".getBytes(), "gift8".getBytes()); + sessionOp.commit(); + + //scan table + ScanIterator it = sessionOp.scan(tName); + while (it.hasNext()) { + RocksDBSession.BackendColumn col = it.next(); + System.out.println(new String(col.name) + " : " + new String(col.value)); + } + + long c = rSession.getApproximateDataSize(); + System.out.println(c); + + long c1 = RocksDBFactory.getInstance().getTotalSize(); + System.out.println(c1); + } + + // @Test + public void testScan() { + System.out.println("RocksDBSessionTest::testScan test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t7"; + rSession.checkTable(tName); + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "box1".getBytes(), "gift1".getBytes()); + sessionOp.put(tName, "box2".getBytes(), "gift2".getBytes()); + sessionOp.put(tName, "box3".getBytes(), "gift3".getBytes()); + sessionOp.put(tName, "room1".getBytes(), "killer1".getBytes()); + sessionOp.put(tName, "box4".getBytes(), "gift4".getBytes()); + sessionOp.put(tName, "box5".getBytes(), "gift5".getBytes()); + sessionOp.put(tName, "room2".getBytes(), "killer2".getBytes()); + sessionOp.put(tName, "box6".getBytes(), "gift6".getBytes()); + sessionOp.put(tName, "boat1".getBytes(), "girl1".getBytes()); + sessionOp.put(tName, "box7".getBytes(), "gift7".getBytes()); + sessionOp.put(tName, "box8".getBytes(), "gift8".getBytes()); + sessionOp.commit(); + + //prefix scan + ScanIterator it = sessionOp.scan(tName, "bo".getBytes()); + while (it.hasNext()) { + RocksDBSession.BackendColumn col = it.next(); + System.out.println(new String(col.name) + " : " + new String(col.value)); + } + + } + + // @Test + public void testScan1() { + System.out.println("RocksDBSessionTest::testScan1 test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "t8"; + rSession.checkTable(tName); + + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.put(tName, "box1".getBytes(), "gift1".getBytes()); + sessionOp.put(tName, "box2".getBytes(), "gift2".getBytes()); + sessionOp.put(tName, "box3".getBytes(), "gift3".getBytes()); + sessionOp.put(tName, "room1".getBytes(), "killer1".getBytes()); + sessionOp.put(tName, "box4".getBytes(), "gift4".getBytes()); + sessionOp.put(tName, "box5".getBytes(), "gift5".getBytes()); + sessionOp.put(tName, "room2".getBytes(), "killer2".getBytes()); + sessionOp.put(tName, "box6".getBytes(), "gift6".getBytes()); + sessionOp.put(tName, "boat1".getBytes(), "girl1".getBytes()); + sessionOp.put(tName, "box7".getBytes(), "gift7".getBytes()); + sessionOp.put(tName, "box8".getBytes(), "gift8".getBytes()); + sessionOp.commit(); + + //range scan + ScanIterator it = sessionOp.scan(tName, "box2".getBytes(), "box5".getBytes(), + ScanIterator.Trait.SCAN_GTE_BEGIN | + ScanIterator.Trait.SCAN_LTE_END); + while (it.hasNext()) { + RocksDBSession.BackendColumn col = it.next(); + System.out.println(new String(col.name) + " : " + new String(col.value)); + } + + } + + // @Test + public void testCount() { + System.out.println("RocksDBSessionTest::testCount test"); + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + String tName = "c1"; + rSession.checkTable(tName); + + int part1 = 1; + int part2 = 2; + int part3 = 3; + SessionOperator sessionOp = rSession.sessionOp(); + sessionOp.prepare(); + sessionOp.put(tName, keyAppendPartId(part1, "k1".getBytes()), "v1".getBytes()); + sessionOp.put(tName, keyAppendPartId(part1, "k2".getBytes()), "v2".getBytes()); + sessionOp.put(tName, keyAppendPartId(part1, "k3".getBytes()), "v3".getBytes()); + sessionOp.put(tName, intToBytesForPartId(part2), "v".getBytes()); + sessionOp.put(tName, keyAppendPartId(part2, "k4".getBytes()), "v4".getBytes()); + sessionOp.put(tName, keyAppendPartId(part2, "k5".getBytes()), "v5".getBytes()); + sessionOp.put(tName, keyAppendPartId(part2, "k6".getBytes()), "v6".getBytes()); + sessionOp.put(tName, intToBytesForPartId(part3), "v".getBytes()); + sessionOp.put(tName, keyAppendPartId(part3, "k8".getBytes()), "v8".getBytes()); + sessionOp.put(tName, keyAppendPartId(part3, "k9".getBytes()), "v9".getBytes()); + sessionOp.put(tName, keyAppendPartId(part3, "k10".getBytes()), "v10".getBytes()); + sessionOp.put(tName, keyAppendPartId(part3, "k11".getBytes()), "v11".getBytes()); + sessionOp.commit(); + +// assertEquals(3, rSession.sessionOp().keyCount(tName, getPartStartKey(part1), +// getPartEndKey(part1))); +// assertEquals(4, rSession.sessionOp().keyCount(tName, getPartStartKey(part2), +// getPartEndKey(part2))); +// assertEquals(5, rSession.sessionOp().keyCount(tName, getPartStartKey(part3), +// getPartEndKey(part3))); + + assertEquals(12, rSession.sessionOp().keyCount("\0".getBytes(StandardCharsets.UTF_8), + "\255".getBytes(StandardCharsets.UTF_8), + tName)); + assertEquals(12, rSession.sessionOp().estimatedKeyCount(tName)); + } + + // @Test + public void merge() { + System.out.println("RocksDBSessionTest::merge test"); + + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + + RocksDBSession rSession = rFactory.createGraphDB("./tmp", graphName); + + String tName = "room1"; + rSession.checkTable(tName); + byte[] x2 = longToByteArray(19); + System.out.println(x2); + SessionOperator sessionOp = rSession.sessionOp(); + byte[] x1 = sessionOp.get(tName, "p3".getBytes()); + String str1 = new String(x1); + if (x1 != null) { + long curValue = longFromByteArray(x1); + System.out.println("current value:" + curValue); + sessionOp.merge(tName, "p3".getBytes(), longToByteArray(10)); + sessionOp.commit(); + } + + byte[] value = rSession.sessionOp().get(tName, "p3".getBytes()); + final long longValue = longFromByteArray(value); + System.out.println("after merge value: " + longValue); + + sessionOp.put(tName, "p3".getBytes(), "19".getBytes()); + sessionOp.commit(); + + ScanIterator it = sessionOp.scan(tName, "p".getBytes()); + System.out.println("after put ------>"); + while (it.hasNext()) { + RocksDBSession.BackendColumn col = it.next(); + System.out.println(new String(col.name) + ":" + new String(col.value)); + } + + } + + // @Test + public void truncate() { + System.out.println("RocksDBSessionTest::truncate test"); + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + RocksDBSession rSession = rFactory.createGraphDB("./tmp", "gg1"); + rSession.checkTable("f1"); + SessionOperator op = rSession.sessionOp(); + op.put("f1", "m1".getBytes(), "n1".getBytes()); + op.put("f1", "m2".getBytes(), "n2".getBytes()); + op.put("f1", "m3".getBytes(), "n3".getBytes()); + op.commit(); + rSession.checkTable("f2"); + + RocksDBSession rocksDBSession2 = rFactory.createGraphDB("./tmp", "gg2"); + rocksDBSession2.checkTable("txt"); + + rSession.truncate(); + op = rSession.sessionOp(); + op.put("f1", "beijing".getBytes(), "renkou".getBytes()); + op.commit(); + + rFactory.releaseAllGraphDB(); + } + + // @Test + public void batchPut2() { + RocksDBFactory rFactory = RocksDBFactory.getInstance(); + RocksDBSession rSession = rFactory.createGraphDB("./tmp", "gg1"); + rSession.checkTable("f1"); + SessionOperator op = rSession.sessionOp(); + try { + op.put("f1", "m2".getBytes(), "xx2".getBytes()); + op.put("f1", "m1".getBytes(), "xx1".getBytes()); + op.put("f1", "m3".getBytes(), "xx3".getBytes()); + op.put("f1", "m4".getBytes(), "xx5".getBytes()); + op.commit(); + op.deleteRange("f1", "m5".getBytes(), "m2".getBytes()); + + op.put("f2", new byte[]{1, 1}, new byte[]{1, -3}); + op.put("f2", new byte[]{1, -2}, new byte[]{2, 0}); + op.put("f2", new byte[]{1, 10}, new byte[]{1, 5}); + op.put("f2", new byte[]{1, 9}, new byte[]{1, 3}); + op.commit(); + + op.deleteRange("f2", new byte[]{1, 1}, new byte[]{1, -1}); + op.commit(); + + ScanIterator it = op.scan("f2"); + while (it.hasNext()) { + RocksDBSession.BackendColumn col = it.next(); + System.out.println(Arrays.toString(col.name) + "=>" + Arrays.toString(col.value)); + } + + } catch (DBStoreException e) { + System.out.println(e); + } + + } + + // @Test + public void doStuff() { + byte[] stuff = new byte[]{1, 3, 1, -3}; + System.out.println(Arrays.toString(stuff)); + } + + private byte[] longToByteArray(long l) { + ByteBuffer buf = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + buf.putLong(l); + return buf.array(); + } + + private long longFromByteArray(byte[] a) { + ByteBuffer buf = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.LITTLE_ENDIAN); + buf.put(a); + buf.flip(); + return buf.getLong(); + } + +} diff --git a/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksdbDump.java b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksdbDump.java new file mode 100644 index 0000000000..abe9062423 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/RocksdbDump.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.util.ArrayList; +import java.util.List; + +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.Options; +import org.rocksdb.ReadOptions; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; + +public class RocksdbDump { + + // @Test + public void dump() throws RocksDBException { + String dbPath = + "D:\\Workspaces\\baidu\\hugegraph\\hugegraph-store\\tmp\\8500\\db\\default" + + "\\hugegraph\\g"; + List cfDescriptors = new ArrayList<>(); + List columnFamilyBytes = RocksDB.listColumnFamilies(new Options(), dbPath); + ColumnFamilyOptions cfOptions = new ColumnFamilyOptions(); + if (columnFamilyBytes.size() > 0) { + for (byte[] columnFamilyByte : columnFamilyBytes) { + cfDescriptors.add(new ColumnFamilyDescriptor(columnFamilyByte, cfOptions)); + } + } + + final List columnFamilyHandles = new ArrayList<>(); + try (final DBOptions options = new DBOptions(); + final RocksDB db = RocksDB.open(options, dbPath, cfDescriptors, columnFamilyHandles)) { + for (ColumnFamilyHandle handle : columnFamilyHandles) { + System.out.println(new String(handle.getName()) + "---------------"); + try (RocksIterator iterator = db.newIterator(handle, new ReadOptions())) { + iterator.seekToFirst(); + while (iterator.isValid()) { + byte[] key = iterator.key(); + // System.out.println(new String(key) + " -- " + Bytes.toHex(key)); + iterator.next(); + } + } + } + } + } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/SnapshotManagerTest.java b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/SnapshotManagerTest.java new file mode 100644 index 0000000000..2d5842a114 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/java/org/apache/hugegraph/rocksdb/access/SnapshotManagerTest.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.rocksdb.access; + +import java.io.File; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hugegraph.store.term.HgPair; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class SnapshotManagerTest { + + final String tempSuffix = "_temp_"; + final String graphName = "test_graph"; + + // @Test + public void testRetrieveIndex() throws DBStoreException { + String snapshotPath = + "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8501\\raft\\1/snapshot" + + "\\snapshot_40\\graph_1_p1"; + long lastIndex = 0L; + File file = new File(snapshotPath); + File parentFile = new File(file.getParent()); + String[] arr = parentFile.getName().split("_"); + if (arr.length >= 2) { + lastIndex = Long.parseLong(arr[arr.length - 1]); + } else { + throw new DBStoreException(String.format("Invalid Snapshot path %s", snapshotPath)); + } + + System.out.println(lastIndex); + } + + // @Test + public void testFoundMaxDir() { +// String path = "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8503\\db\\default +// \\hugegraph\\g\\0_123"; +// String path = "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8503\\db\\default +// \\hugegraph\\g\\0"; +// String path = "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8501\\db\\default +// \\hugegraph\\g\\0"; +// String path = "D:\\tmp\\db\\0"; +// String path = "D:\\tmp\\db\\0_111"; + String path = "D:\\tmp\\db\\0_111_22222"; + File file = new File(path); + int strIndex = file.getName().indexOf("_"); + + String defaultName; + if (strIndex < 0) { + defaultName = file.getName(); + } else { + defaultName = file.getName().substring(0, strIndex); + } + String prefix = defaultName + "_"; + File parentFile = new File(file.getParent()); + final List> dbs = new ArrayList<>(); + final File[] files = parentFile.listFiles(); + if (files != null) { + // search all db path + for (final File sFile : files) { + final String name = sFile.getName(); + if (!name.startsWith(prefix) && !name.equals(defaultName)) { + continue; + } + if (name.endsWith(tempSuffix)) { + continue; + } + long v1 = -1L; + long v2 = -1L; + if (name.length() > defaultName.length()) { + String[] versions = name.substring(prefix.length()).split("_"); + if (versions.length == 1) { + v1 = Long.parseLong(versions[0]); + } else if (versions.length == 2) { + v1 = Long.parseLong(versions[0]); + v2 = Long.parseLong(versions[1]); + } else { + continue; + } + } + dbs.add(new HgPair<>(v1, v2)); + } + } + + // get last index db path + String latestDBPath = ""; + if (!dbs.isEmpty()) { + dbs.sort((o1, o2) -> o1.getKey().equals(o2.getKey()) ? + o1.getValue().compareTo(o2.getValue()) : + o1.getKey().compareTo(o2.getKey())); + final int dbCount = dbs.size(); + + // delete old db + for (int i = 0; i < dbCount; i++) { + final HgPair pair = dbs.get(i); + String curDBName; + if (pair.getKey() == -1L) { + curDBName = defaultName; + } else if (pair.getValue() == -1L) { + curDBName = String.format("%s_%d", defaultName, pair.getKey()); + } else { + curDBName = + String.format("%s_%d_%d", defaultName, pair.getKey(), pair.getValue()); + } + String curDBPath = Paths.get(parentFile.getPath(), curDBName).toString(); + if (i == dbCount - 1) { + latestDBPath = curDBPath; + } else { + log.info("delete old dbpath {}", curDBPath); + } + } + + } else { + latestDBPath = Paths.get(parentFile.getPath(), defaultName).toString(); + } + log.info("{} latest db path {}", this.graphName, latestDBPath); + + } + + // @Test + public void testDefaultPath() { +// String latestDBPath = "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8501\\db +// \\default\\hugegraph\\g\\0_123"; + String latestDBPath = + "D:\\Code\\baidu\\starhugegraph\\hugegraph-store\\tmp\\8501\\db\\default" + + "\\hugegraph\\g\\0"; + File file = new File(latestDBPath); + String parent = file.getParent(); + String defaultName = file.getName().split("_")[0]; + String defaultPath = Paths.get(parent, defaultName).toString(); + System.out.println(defaultPath); + } + +// private static final String graphName = "unit-test-graph"; +// private static final String graphName2 = "unit-test-graph-2"; +// private static final int partition_1 = 1; +// private static final int partition_2 = 2; +// private static final int partition_3 = 3; +// private static final int partition_4 = 4; +// private static ArrayList tableList = new ArrayList(); +// private static ArrayList partitionList = new ArrayList(); +// private static RocksDBFactory factory = RocksDBFactory.getInstance(); +// private static HugeConfig hConfig; +// private static HugeConfig hConfig2; +// private int index = 0; +// +// private static byte[] values = new byte[1024]; +// +// +// // @BeforeClass +// public static void beforeClass() { +// // Register config +// OptionSpace.register("rocksdb", +// "org.apache.hugegraph.rocksdb.access.RocksDBOptions"); +// // Register backend +// URL configPath = SnapshotManagerTest.class.getResource("/hugegraph.properties"); +// URL configPath2 = SnapshotManagerTest.class.getResource("/hugegraph-2.properties"); +// hConfig = new HugeConfig(configPath.getPath()); +// hConfig2 = new HugeConfig(configPath2.getPath()); +// factory.setHugeConfig(hConfig); +// // init tables +// tableList.add("t1"); +// tableList.add("t2"); +// tableList.add("t3"); +// tableList.add("t4"); +// tableList.add("abc/t5"); +// tableList.add("../t6"); +// // init partitons +// partitionList.add(partition_1); +// partitionList.add(partition_2); +// partitionList.add(partition_3); +// partitionList.add(partition_4); +// +// for (int i = 0; i < 1024; i++) +// values[i] = (byte) (i % 0x7F); +// +// } +// +// @AfterClass +// public static void afterClass() { +// factory.releaseAllGraphDB(); +// } +// // @Test +// public void test(){ +// Map configMap = new HashMap<>(); +// configMap.put("rocksdb.data_path", "tmp/test2/huge/"); +// configMap.put("rocksdb.wal_path", "tmp/test2/wal"); +// configMap.put("rocksdb.snapshot_path", "tmp/test2/snapshot"); +// configMap.put("rocksdb.write_buffer_size","1048576"); +// HugeConfig config = new HugeConfig(configMap); +// factory.setHugeConfig(config); +// RocksDBSession session = factory.createGraphDB(graphName, false); +// SessionOperator sessionOp = session.sessionOp(); +// +// String table = "t1"; +// +// String startKey = String.format("%d_%08d", partition_1, 0); +// String lastKey = String.format("%d_%08d", partition_1, 99); +// String endKey = String.format("%d_%08d", partition_1, 100); +// +// +// printKV(sessionOp, table, startKey); +// printKV(sessionOp, table, lastKey); +// } +// +// private void printKV(SessionOperator sessionOp, String table, String key) { +// byte[] value = sessionOp.get(table, key.getBytes()); +// if (value != null) +// System.out.printf("table=%s, key=%s, value=%s\n", table, key, new String(value)); +// else +// System.out.printf("table=%s, key=%s, does not found\n", table, key); +// } +// +// private void insertData(int count) { +// RocksDBSession session = factory.createGraphDB(graphName, false); +// SessionOperator sessionOp = session.sessionOp(); +// sessionOp.prepare(); +// sessionOp.prepare(); +// final int begin = index; +// final int end = index + count; +// System.out.printf("begin to insert data from index %d to %d\n", begin, end); +// +// for (; index < end; index++) { +// for (String table : tableList) { +// for (Integer partition : partitionList) { +// String key = String.format("%d_%08d", partition, index); +// String value = String.format("%s_%08d", table, index); +// sessionOp.put(table, key.getBytes(), value.getBytes()); +// } +// } +// if (index % 1000 == 999) { +// System.out.printf("insertData, commit index %d\n", index+1); +// sessionOp.commit(); +// sessionOp.prepare(); +// } +// } +// sessionOp.commit(); +// session.flush(); +// System.out.printf("inserted %d pairs data\n", count); +// } +// +// // @Test +// public void testSnapshotMetadata() { +// DBSnapshotMeta metadata = new DBSnapshotMeta(); +// metadata.setGraphName(graphName); +// metadata.setPartitionId(partition_1); +// metadata.setStartKey("k1".getBytes()); +// metadata.setEndKey("k9".getBytes()); +// HashMap sstFiles = new HashMap(); +// sstFiles.put("cf1", "cf1.sst"); +// sstFiles.put("cf2", "cf2.sst"); +// sstFiles.put("cf3", "cf3.sst"); +// sstFiles.put("cf4", "cf4.sst"); +// metadata.setCreatedDate(new Date()); +// metadata.setSstFiles(sstFiles); +// +// System.out.println("metadata object:"); +// System.out.println(metadata.toString()); +// +// String jsonStr = JSON.toJSONString(metadata); +// System.out.println("metadata json:"); +// System.out.println(jsonStr); +// +// DBSnapshotMeta metadata2 = JSON.parseObject(jsonStr, DBSnapshotMeta.class); +// System.out.println("metadata2 object:"); +// System.out.println(metadata2.toString()); +// +// assertEquals(metadata, metadata2); +// } +// +// // @Test +// public void testCompress() throws IOException { +// final String rootDir = "./tmp/fileutils"; +// final File dir1 = Paths.get(rootDir, "dir1").toFile(); +// final File dir2 = Paths.get(rootDir, "dir2").toFile(); +// final File dir3 = Paths.get(rootDir, "dir3").toFile(); +// final File dir4 = Paths.get(rootDir, "dir4").toFile(); +// final File file0 = Paths.get(rootDir, "file0").toFile(); +// final File file1 = Paths.get(dir1.getAbsolutePath(), "file1").toFile(); +// FileUtils.forceMkdirParent(dir1); +// FileUtils.forceMkdirParent(dir2); +// FileUtils.forceMkdirParent(dir3); +// FileUtils.deleteDirectory(dir2); +// FileUtils.forceMkdirParent(dir4); +// FileUtils.moveDirectory(dir3, dir1); +// FileUtils.touch(file0); +// FileUtils.touch(file1); +// final Checksum c1 = new CRC64(); +// ZipUtils.compress("./tmp", "fileutils", "./tmp/fileutils.zip", c1); +// +// FileUtils.deleteDirectory(new File("./tmp/fileutils")); +// +// final Checksum c2 = new CRC64(); +// ZipUtils.decompress("./tmp/fileutils.zip", "./tmp", c2); +// Assert.assertEquals(c1.getValue(), c2.getValue()); +// } +// +// +// // @Test +// public void testDecompress() throws IOException { +// final Checksum checksum = new CRC64(); +// String dir = "./tmp/server-00/snapshot/test/import/1"; +// String source = Paths.get(dir, "snapshot.zip").toString(); +// FileUtils.deleteDirectory(Paths.get(dir, "snapshot").toFile()); +// ZipUtils.decompress(source, dir, checksum); +// } +// +// +// // @Test +// public void testExportSnapshot() throws DBStoreException, IOException { +// final String exportDir = hConfig.getString("rocksdb.snapshot_path"); +// File exportDirFile = new File(exportDir); +// if (exportDirFile.exists()) { +// FileUtils.forceDelete(new File(exportDir)); +// } +// FileUtils.forceMkdir(new File(exportDir)); +// +// RocksDBSession session = factory.createGraphDB(graphName, false); +// insertData(1000); +// String startKey = String.format("%d_%08d", partition_1, 0); +// String endKey = String.format("%d_%08d", partition_1, 100); +// session.exportSnapshot(exportDir, 1, startKey.getBytes(), endKey.getBytes()); +// } +// +// // @Test +// public void testImportSnapshot() throws DBStoreException, IOException { +// final String importDir = hConfig.getString("rocksdb.snapshot_path"); +// FileUtils.forceMkdir(new File(importDir)); +// +// factory.setHugeConfig(hConfig2); +// RocksDBSession session = factory.createGraphDB(graphName, false); +// SessionOperator sessionOp = session.sessionOp(); +// +// String table = "t1"; +// +// String startKey = String.format("%d_%08d", partition_1, 0); +// String lastKey = String.format("%d_%08d", partition_1, 99); +// String endKey = String.format("%d_%08d", partition_1, 100); +// +// session.importSnapshot(importDir, partition_1); +// long keyCount = session.sessionOp().keyCount(table, startKey.getBytes(), endKey +// .getBytes()); +// System.out.printf("key count of %s is %d\n", graphName, keyCount); +// +// +// System.out.printf("after load snapshot\n"); +// printKV(sessionOp, table, startKey); +// printKV(sessionOp, table, lastKey); +// } +// +// // @Test +// public void testExportSnapshot2() throws DBStoreException, IOException { +// final String exportDir = hConfig.getString("rocksdb.snapshot_path"); +// deleteDir(new File(hConfig.getString("rocksdb.data_path"))); +// deleteDir(new File(exportDir)); +// FileUtils.forceMkdir(new File(exportDir)); +// +// RocksDBSession session = factory.createGraphDB(graphName, false); +// +// long seqNum = session.getLatestSequenceNumber(); +// System.out.println(seqNum); +// insertData(500); +// System.out.println(session.getLatestSequenceNumber()); +// String startKey = String.format("%d_%08d", partition_1, index - 200); +// String endKey = String.format("%d_%08d", partition_1, index - 100); +// +// ScanIterator cfIterator = session.sessionOp().scanRaw(startKey.getBytes(), endKey +// .getBytes(), seqNum); +// while (cfIterator.hasNext()) { +// ScanIterator iterator = cfIterator.next(); +// System.out.println("cf name is " + new String(cfIterator.position())); +// while (iterator.hasNext()) { +// RocksDBSession.BackendColumn col = iterator.next(); +// System.out.println(new String(col.name)); +// } +// System.out.println("-----------------------------------------"); +// iterator.close(); +// } +// cfIterator.close(); +// } +// +// // @Test +// public void testImportSnapshot2() throws DBStoreException, IOException { +// final String importDir = "/tmp/snapshot"; +// deleteDir(new File("/tmp/rocksdb/data" + "/" + graphName2)); +// RocksDBSession session = factory.createGraphDB(graphName2, false); +// SessionOperator sessionOp = session.sessionOp(); +// +// +// session.flush(); +// +// String table = "t1"; +// +// String k1 = String.format("%d_%08d", partition_1, 100); +// String k2 = String.format("%d_%08d", partition_1, 109); +// String k3 = String.format("%d_%08d", partition_1, 110); +// +// printKV(sessionOp, table, k1); +// printKV(sessionOp, table, k2); +// printKV(sessionOp, table, k3); +// +// } +// +// +// private static boolean deleteDir(File dir) { +// if (dir.isDirectory()) +// for (File file : dir.listFiles()) +// deleteDir(file); +// return dir.delete(); +// } +} diff --git a/hugegraph-store/hg-store-rocksdb/src/test/java/place-holder.txt b/hugegraph-store/hg-store-rocksdb/src/test/java/place-holder.txt new file mode 100644 index 0000000000..e69de29bb2 diff --git a/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph-2.properties b/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph-2.properties new file mode 100644 index 0000000000..b31ba417c2 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph-2.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# rocksdb backend config +rocksdb.data_path=./tmp/rocksdb-2/data +rocksdb.wal_path=./tmp/rocksdb-2/data +rocksdb.snapshot_path=./tmp/rocksdb-2/snapshot +rocksdb.write_buffer_size=1048576 +rocksdb.min_write_buffer_number_to_merge=2 +rocksdb.max_write_buffer_number=4 +rocksdb.level0_file_num_compaction_trigger=2 +rocksdb.target_file_size_base=1048576 +rocksdb.target_file_size_multiplier=2 +rocksdb.max_bytes_for_level_base=1048576 +rocksdb.max_bytes_for_level_multiplier=2 diff --git a/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph.properties b/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph.properties new file mode 100644 index 0000000000..bcd4abc309 --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/resources/hugegraph.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# rocksdb backend config +rocksdb.data_path=./tmp/rocksdb/data +rocksdb.wal_path=./tmp/rocksdb/data +rocksdb.snapshot_path=./tmp/rocksdb/snapshot +rocksdb.write_buffer_size=1048576 +rocksdb.min_write_buffer_number_to_merge=2 +rocksdb.max_write_buffer_number=4 +rocksdb.level0_file_num_compaction_trigger=2 +rocksdb.target_file_size_base=1048576 +rocksdb.target_file_size_multiplier=2 +rocksdb.max_bytes_for_level_base=1048576 +rocksdb.max_bytes_for_level_multiplier=2 diff --git a/hugegraph-store/hg-store-rocksdb/src/test/resources/log4j2.xml b/hugegraph-store/hg-store-rocksdb/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..2295e21aef --- /dev/null +++ b/hugegraph-store/hg-store-rocksdb/src/test/resources/log4j2.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + diff --git a/hugegraph-store/pom.xml b/hugegraph-store/pom.xml index 0bb8cd96e1..dc74b6f439 100644 --- a/hugegraph-store/pom.xml +++ b/hugegraph-store/pom.xml @@ -38,8 +38,8 @@ hg-store-grpc hg-store-client hg-store-test + hg-store-rocksdb - @@ -67,11 +67,11 @@ hg-store-grpc ${project.version} - - - - - + + org.apache.hugegraph + hg-store-rocksdb + ${project.version} + org.apache.hugegraph hg-store-client @@ -254,7 +254,7 @@ - +