diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties
index cdfad1b847..700eb3f3d0 100644
--- a/pixels-common/src/main/resources/pixels.properties
+++ b/pixels-common/src/main/resources/pixels.properties
@@ -336,6 +336,56 @@ index.rockset.persistent.cache.path=/tmp/cache
# cache size for rockset (rocksdb-cloud)
index.rockset.persistent.cache.size.gb=1
index.rockset.read.only=false
+# rockset key fixed prefix length
+index.rockset.prefix.length=4
+# Rockset: whether each index corresponds to its own column family
+index.rockset.multicf=true
+# rockset data path
+index.rockset.data.path=/tmp/rockset
+# rockset write buffer size (default to 32MB)
+index.rockset.write.buffer.size=33554432
+# rockset max write buffer number (default to 3)
+index.rockset.max.write.buffer.number=3
+# rockset max background flush threads (default to 2)
+index.rockset.max.background.flushes=2
+# rockset max background compactions (default to 4)
+index.rockset.max.background.compactions=4
+# rockset max open files (default to 4096)
+index.rockset.max.open.files=4096
+# rockset block cache capacity (default to 1GB)
+index.rockset.block.cache.capacity=1073741824
+# rockset block cache shard bits (default to 6, i.e., 64 shards)
+index.rockset.block.cache.shard.bits=6
+# rockset block size (default to 16KB)
+index.rockset.block.size=16384
+# rockset min write buffer number to merge (default to 2)
+index.rockset.min.write.buffer.number.to.merge=2
+# rockset file number compaction trigger (default to 4)
+index.rockset.level0.file.num.compaction.trigger=4
+# rockset max bytes for level base (default to 256MB)
+index.rockset.max.bytes.for.level.base=268435456
+# rockset max bytes for level multiplier (default to 10)
+index.rockset.max.bytes.for.level.multiplier=10
+# rockset target file size base (default to 64MB)
+index.rockset.target.file.size.base=67108864
+# rockset file size multiplier (default to 1)
+index.rockset.target.file.size.multiplier=1
+# rockset max subcompactions
+index.rockset.max.subcompactions=1
+# rockset compression type
+index.rockset.compression.type=LZ4_COMPRESSION
+# rockset bottommost compression type
+index.rockset.bottommost.compression.type=ZSTD_COMPRESSION
+# Rockset compaction style (e.g. UNIVERSAL, LEVEL)
+index.rockset.compaction.style=LEVEL
+# Whether to enable Rockset internal statistics collection
+index.rockset.stats.enabled=false
+# Directory where Rockset will write statistics logs
+index.rockset.stats.path=/tmp/rocksetStats
+# Time interval (in seconds) between statistics dumps
+index.rockset.stats.interval=60
+# Time interval (in seconds) between rockset memory usage log
+index.rockset.log.interval=10
# rocksdb data path
index.rocksdb.data.path=/tmp/rocksdb
# rocksdb write buffer size (default to 64MB)
@@ -367,7 +417,7 @@ index.rocksdb.target.file.size.base=67108864
# rocksdb file size multiplier (default to 1)
index.rocksdb.target.file.size.multiplier=1
# rocksdb key fixed prefix length
-index.rocksdb.prefix.length=12
+index.rocksdb.prefix.length=4
# rocksdb max subcompactions
index.rocksdb.max.subcompactions=1
# rocksdb compression type (e.g. NO_COMPRESSION, SNAPPY_COMPRESSION, ZLIB_COMPRESSION, BZ2_COMPRESSION, LZ4_COMPRESSION, LZ4HC_COMPRESSION, ZSTD_COMPRESSION)
@@ -391,7 +441,7 @@ index.cache.capacity=10000000
# The expiration time (in seconds) of cache entries
index.cache.expiration.seconds=3600
# whether each index corresponds to its own column family
-index.rocksdb.multicf=false
+index.rocksdb.multicf=true
index.bucket.num=128
# the directory where the sqlite files of main index are stored, each main index is stored as a sqlite file
index.sqlite.path=/tmp/sqlite
diff --git a/pixels-index/pixels-index-rocksdb/pom.xml b/pixels-index/pixels-index-rocksdb/pom.xml
index 2e019d349f..7cf0f69986 100644
--- a/pixels-index/pixels-index-rocksdb/pom.xml
+++ b/pixels-index/pixels-index-rocksdb/pom.xml
@@ -15,8 +15,6 @@
8
8
UTF-8
-
- 10.2.1
diff --git a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java
index a6441818c4..5b1fd91ed9 100644
--- a/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java
+++ b/pixels-index/pixels-index-rocksdb/src/main/java/io/pixelsdb/pixels/index/rocksdb/RocksDBFactory.java
@@ -189,7 +189,9 @@ private static ColumnFamilyDescriptor createCFDescriptor(byte[] name, Integer ke
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rocksdb.prefix.length"));
if (keyLen != null)
{
- fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id
+ // Prefix must only cover the logical lookup key.
+ // It must not include the encoded timestamp suffix.
+ fixedLengthPrefix = keyLen + (multiCF ? 0 : Long.BYTES);
}
CompactionStyle compactionStyle = CompactionStyle.valueOf(config.getProperty("index.rocksdb.compaction.style"));
diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java
index 67ed0306eb..4212bf054f 100644
--- a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java
+++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRocksDBIndex.java
@@ -25,39 +25,52 @@
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
import io.pixelsdb.pixels.common.index.IndexOption;
import io.pixelsdb.pixels.common.index.SinglePointIndex;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.common.utils.IndexUtils;
import io.pixelsdb.pixels.index.IndexProto;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toBuffer;
import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.toKeyBuffer;
+import static io.pixelsdb.pixels.index.rocksdb.RocksDBIndex.startsWith;
import static org.junit.jupiter.api.Assertions.*;
public class TestRocksDBIndex
{
+ private static final boolean MULTI_CF =
+ Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rocksdb.multicf"));
+ private static final Logger log = LoggerFactory.getLogger(TestRocksDBIndex.class);
private RocksDB rocksDB;
private static final long TABLE_ID = 100L;
private static final long INDEX_ID = 100L;
+ private static final int VNODE_ID = 0;
private SinglePointIndex uniqueIndex;
private SinglePointIndex nonUniqueIndex;
-
+ private ColumnFamilyHandle columnFamilyHandle;
@BeforeEach
public void setUp() throws RocksDBException,SinglePointIndexException
{
IndexOption option = IndexOption.builder()
- .vNodeId(0)
+ .vNodeId(VNODE_ID)
.build();
uniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID, true, option);
nonUniqueIndex = new RocksDBIndex(TABLE_ID, INDEX_ID + 1, false, option);
rocksDB = RocksDBFactory.getRocksDB();
+ columnFamilyHandle = RocksDBFactory.getOrCreateColumnFamily(TABLE_ID, INDEX_ID, VNODE_ID);
}
@AfterEach
@@ -77,7 +90,7 @@ public void tearDown() throws SinglePointIndexException
public void testPutEntry() throws RocksDBException, SinglePointIndexException
{
// Create Entry
- byte[] key = "testPutEntry".getBytes();
+ byte[] key = ByteBuffer.allocate(4).putInt(1).array();
long timestamp = 1000L;
long rowId = 100L;
@@ -91,7 +104,7 @@ public void testPutEntry() throws RocksDBException, SinglePointIndexException
ByteBuffer valueBuffer = RocksDBThreadResources.getValueBuffer();
ReadOptions readOptions = new ReadOptions();
// Assert index has been written to rocksDB
- int ret = rocksDB.get(readOptions, keyBuffer, valueBuffer);
+ int ret = rocksDB.get(columnFamilyHandle, readOptions, keyBuffer, valueBuffer);
assertTrue(ret != RocksDB.NOT_FOUND);
long storedRowId = valueBuffer.getLong();
@@ -172,6 +185,50 @@ public void testGetUniqueRowId() throws SinglePointIndexException
assertEquals(rowId2, result, "getUniqueRowId should return the rowId of the latest timestamp entry");
}
+ @Test
+ public void testSeekFindsNextVersionWithSameLogicalPrefix() throws Exception
+ {
+ // Use Default Prefix Len = 4
+ byte[] key = ByteBuffer.allocate(4).putInt(7).array();
+ long[] storedTimestamps = {1L, 3L, 5L, 7L, 9L};
+ long[] seekTimestamps = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L};
+ long[] expectedTimestamps = {1L, 1L, 3L, 3L, 5L, 5L, 7L, 7L, 9L, 9L};
+
+ for (long timestamp : storedTimestamps)
+ {
+ IndexProto.IndexKey storedKey = IndexProto.IndexKey.newBuilder()
+ .setIndexId(INDEX_ID)
+ .setKey(ByteString.copyFrom(key))
+ .setTimestamp(timestamp)
+ .build();
+ uniqueIndex.putEntry(storedKey, timestamp);
+ }
+
+ ReadOptions readOptions = RocksDBThreadResources.getReadOptions();
+ readOptions.setPrefixSameAsStart(true)
+ .setTotalOrderSeek(false)
+ .setVerifyChecksums(false);
+
+ for (int i = 0; i < seekTimestamps.length; i++)
+ {
+ long seekTimestamp = seekTimestamps[i];
+ long expectedTimestamp = expectedTimestamps[i];
+ ByteBuffer seekKey = toKeyBuffer(indexKey(key, seekTimestamp));
+
+ try (RocksIterator iterator = rocksDB.newIterator(columnFamilyHandle, readOptions))
+ {
+ iterator.seek(seekKey);
+ assertTrue(iterator.isValid(), "seek should find a version for timestamp " + seekTimestamp);
+ assertTrue(startsWith(ByteBuffer.wrap(iterator.key()), seekKey),
+ "seek should remain within the same logical prefix for timestamp " + seekTimestamp);
+ long getTs = extractTimestampFromUniqueKey(iterator.key());
+ System.out.println("Timestamp: " + getTs);
+ assertEquals(expectedTimestamp, getTs,
+ "seek should land on the closest stored version for timestamp " + seekTimestamp);
+ }
+ }
+ }
+
@Test
public void testGetRowIds() throws SinglePointIndexException
{
@@ -207,6 +264,21 @@ public void testGetRowIds() throws SinglePointIndexException
assertTrue(rowIds.containsAll(result) && result.containsAll(rowIds), "getRowIds should return the rowId of all entries");
}
+ private static IndexProto.IndexKey indexKey(byte[] key, long timestamp)
+ {
+ return IndexProto.IndexKey.newBuilder()
+ .setIndexId(INDEX_ID)
+ .setKey(ByteString.copyFrom(key))
+ .setTimestamp(timestamp)
+ .build();
+ }
+
+ private static long extractTimestampFromUniqueKey(byte[] encodedKey)
+ {
+ ByteBuffer keyBuffer = ByteBuffer.wrap(encodedKey);
+ return Long.MAX_VALUE - keyBuffer.getLong(encodedKey.length - Long.BYTES);
+ }
+
@Test
public void testDeleteEntry() throws SinglePointIndexException
{
@@ -396,4 +468,4 @@ public void benchmarkDeleteEntry() throws SinglePointIndexException
double durationMs = (end - start) / 1_000_000.0;
System.out.printf("Deleted %,d entries in %.2f ms (%.2f ops/sec)%n", count, durationMs, count * 1000.0 / durationMs);
}
-}
\ No newline at end of file
+}
diff --git a/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetFactory.java b/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetFactory.java
index c6632d8d59..6aa3c0f564 100644
--- a/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetFactory.java
+++ b/pixels-index/pixels-index-rockset/src/main/java/io/pixelsdb/pixels/index/rockset/RocksetFactory.java
@@ -203,9 +203,11 @@ private static RocksetColumnFamilyDescriptor createCFDescriptor(byte[] name, Int
int targetFileSizeMultiplier = Integer.parseInt(config.getProperty("index.rockset.target.file.size.multiplier"));
RocksetCompactionStyle compactionStyle = RocksetCompactionStyle.valueOf(config.getProperty("index.rockset.compaction.style"));
int fixedLengthPrefix = Integer.parseInt(config.getProperty("index.rockset.prefix.length"));
- if(keyLen != null)
+ if (keyLen != null)
{
- fixedLengthPrefix = keyLen + Long.BYTES; // key buffer + index id
+ // Prefix must only cover the logical lookup key.
+ // It must not include the encoded timestamp suffix.
+ fixedLengthPrefix = keyLen + (multiCF ? 0 : Long.BYTES);
}
// Compression Options
@@ -377,4 +379,3 @@ private static void startRocksetLogThread(RocksetDB db)
}, 0, logInterval, TimeUnit.SECONDS);
}
}
-
diff --git a/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java b/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java
index 3b1eeaf632..a6e2215aff 100644
--- a/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java
+++ b/pixels-index/pixels-index-rockset/src/test/java/io/pixelsdb/pixels/index/rockset/TestRocksetIndex.java
@@ -21,42 +21,48 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
-import io.pixelsdb.pixels.common.index.SinglePointIndex;
-import io.pixelsdb.pixels.common.index.SinglePointIndexFactory;
import io.pixelsdb.pixels.common.exception.MainIndexException;
import io.pixelsdb.pixels.common.exception.SinglePointIndexException;
import io.pixelsdb.pixels.common.index.SinglePointIndex;
import io.pixelsdb.pixels.common.index.IndexOption;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.index.IndexProto;
+import io.pixelsdb.pixels.index.rockset.jni.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
-import io.pixelsdb.pixels.index.rockset.jni.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import static io.pixelsdb.pixels.index.rockset.RocksetIndex.startsWith;
import static io.pixelsdb.pixels.index.rockset.RocksetIndex.toKeyBuffer;
import static org.junit.jupiter.api.Assertions.*;
public class TestRocksetIndex
{
+ private static final boolean MULTI_CF =
+ Boolean.parseBoolean(ConfigFactory.Instance().getProperty("index.rockset.multicf"));
private RocksetDB rocksetDB;
private static final long TABLE_ID = 100L;
private static final long INDEX_ID = 100L;
+ private static final int VNODE_ID = 0;
private SinglePointIndex uniqueIndex;
private SinglePointIndex nonUniqueIndex;
+ private RocksetColumnFamilyHandle columnFamilyHandle;
@BeforeEach
public void setUp() throws Exception
{
IndexOption option = IndexOption.builder()
- .vNodeId(0)
+ .vNodeId(VNODE_ID)
.build();
uniqueIndex = new RocksetIndex(TABLE_ID, INDEX_ID, true, option);
nonUniqueIndex = new RocksetIndex(TABLE_ID, INDEX_ID + 1, false, option);
+ rocksetDB = RocksetFactory.getRocksetDB();
+ columnFamilyHandle = RocksetFactory.getOrCreateColumnFamily(TABLE_ID, INDEX_ID, VNODE_ID);
}
@AfterEach
@@ -76,7 +82,7 @@ public void tearDown() throws SinglePointIndexException
public void testPutEntry() throws SinglePointIndexException
{
// Create Entry
- byte[] key = "testPutEntry".getBytes();
+ byte[] key = ByteBuffer.allocate(4).putInt(3622).array();
long timestamp = 1000L;
long rowId = 100L;
@@ -144,7 +150,7 @@ public void testPutEntries() throws SinglePointIndexException, MainIndexExceptio
@Test
public void testGetUniqueRowId() throws SinglePointIndexException
{
- byte[] key = "testGetUniqueRowId".getBytes();
+ byte[] key = ByteBuffer.allocate(4).putInt(2).array();
long timestamp1 = 1000L;
long timestamp2 = timestamp1 + 1000; // newer
@@ -171,6 +177,42 @@ public void testGetUniqueRowId() throws SinglePointIndexException
assertEquals(rowId2, result, "getUniqueRowId should return the rowId of the latest timestamp entry");
}
+ @Test
+ public void testSeekFindsNextVersionWithSameLogicalPrefix() throws Exception
+ {
+ assertTrue(MULTI_CF, "This test assumes multi-CF layout for logical-key prefix matching");
+
+ byte[] key = ByteBuffer.allocate(4).putInt(7).array();
+ long[] storedTimestamps = {1L, 3L, 5L, 7L, 9L};
+ long[] seekTimestamps = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L};
+ long[] expectedTimestamps = {1L, 1L, 3L, 3L, 5L, 5L, 7L, 7L, 9L, 9L};
+
+ for (long timestamp : storedTimestamps)
+ {
+ uniqueIndex.putEntry(indexKey(key, timestamp), timestamp);
+ }
+
+ RocksetReadOptions readOptions = RocksetThreadResources.getReadOptions();
+ readOptions.setPrefixSameAsStart(true);
+
+ for (int i = 0; i < seekTimestamps.length; i++)
+ {
+ long seekTimestamp = seekTimestamps[i];
+ long expectedTimestamp = expectedTimestamps[i];
+ ByteBuffer seekKey = toKeyBuffer(indexKey(key, seekTimestamp));
+
+ try (RocksetIterator iterator = rocksetDB.newIterator(columnFamilyHandle, readOptions))
+ {
+ iterator.seek(seekKey);
+ assertTrue(iterator.isValid(), "seek should find a version for timestamp " + seekTimestamp);
+ assertTrue(startsWith(ByteBuffer.wrap(iterator.key()), seekKey),
+ "seek should remain within the same logical prefix for timestamp " + seekTimestamp);
+ assertEquals(expectedTimestamp, extractTimestampFromUniqueKey(iterator.key()),
+ "seek should land on the closest stored version for timestamp " + seekTimestamp);
+ }
+ }
+ }
+
@Test
public void testGetRowIds() throws SinglePointIndexException
{
@@ -206,6 +248,21 @@ public void testGetRowIds() throws SinglePointIndexException
assertTrue(rowIds.containsAll(result) && result.containsAll(rowIds), "getRowIds should return the rowId of all entries");
}
+ private static IndexProto.IndexKey indexKey(byte[] key, long timestamp)
+ {
+ return IndexProto.IndexKey.newBuilder()
+ .setIndexId(INDEX_ID)
+ .setKey(ByteString.copyFrom(key))
+ .setTimestamp(timestamp)
+ .build();
+ }
+
+ private static long extractTimestampFromUniqueKey(byte[] encodedKey)
+ {
+ ByteBuffer keyBuffer = ByteBuffer.wrap(encodedKey);
+ return Long.MAX_VALUE - keyBuffer.getLong(encodedKey.length - Long.BYTES);
+ }
+
@Test
public void testDeleteEntry() throws SinglePointIndexException
{
@@ -396,4 +453,3 @@ public void benchmarkDeleteEntry() throws SinglePointIndexException
System.out.printf("Deleted %,d entries in %.2f ms (%.2f ops/sec)%n", count, durationMs, count * 1000.0 / durationMs);
}
}
-
diff --git a/pom.xml b/pom.xml
index 78011c6a56..92e8d7c814 100644
--- a/pom.xml
+++ b/pom.xml
@@ -143,6 +143,8 @@
1.7.36
+ 10.2.1
+
4.13.2
1.8.2