Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.pixelsdb.pixels.common.physical.StorageFactory;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.common.utils.RetinaUtils;
import io.pixelsdb.pixels.core.PixelsWriter;
import io.pixelsdb.pixels.core.TypeDescription;
import io.pixelsdb.pixels.core.vector.VectorizedRowBatch;
Expand Down Expand Up @@ -97,7 +98,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me
// 1. Calculate Primary Key and Bucket ID
ByteString pkByteString = calculatePrimaryKeyBytes(colsInLine);
// Assume BucketCache has the necessary method and configuration
int bucketId = BucketCache.getBucketIdFromByteBuffer(pkByteString);
int bucketId = RetinaUtils.getBucketIdFromByteBuffer(pkByteString);

// 2. Get/Initialize the Writer for this Bucket
PerBucketWriter bucketWriter = bucketWriters.computeIfAbsent(bucketId, id ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,31 +85,6 @@ public static BucketCache getInstance()
return instance;
}

/**
* Calculates the bucketId on the hash ring for the input data.
* Uses MurmurHash3_32 and the modulo operation to constrain the result to the
* discrete range [0, bucketNum - 1].
* * @param byteString The input data to be hashed
*
* @return The calculated bucketId
*/
public static int getBucketIdFromByteBuffer(ByteString byteString)
{
// Get the singleton instance to access bucketNum
BucketCache cacheInstance = getInstance();

// 1. Calculate the hash using MurmurHash3_32
int hash = Hashing.murmur3_32_fixed()
.hashBytes(byteString.toByteArray())
.asInt();

// 2. Take the absolute value (MurmurHash3_32 can return negative integers)
int absHash = Math.abs(hash);

// 3. Apply modulo operation to compress the hash value to the range [0, bucketNum - 1]
return absHash % cacheInstance.bucketNum;
}

/**
* Core lookup method: Retrieves the corresponding RetinaNodeInfo for a given bucketId.
* Uses a cache-aside strategy (lazy loading) to populate the cache upon miss.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/

package io.pixelsdb.pixels.common.utils;

import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import io.pixelsdb.pixels.common.node.BucketCache;
import io.pixelsdb.pixels.common.retina.RetinaService;

public class RetinaUtils
{
private static volatile RetinaUtils instance;
private final int bucketNum;
private final int defaultRetinaPort;

private RetinaUtils()
{
ConfigFactory config = ConfigFactory.Instance();
this.bucketNum = Integer.parseInt(config.getProperty("node.bucket.num"));
this.defaultRetinaPort = Integer.parseInt(config.getProperty("retina.server.port"));
}

private static RetinaUtils getInstance()
{
if (instance == null)
{
synchronized (RetinaUtils.class)
{
if (instance == null)
{
instance = new RetinaUtils();
}
}
}
return instance;
}

/**
* Calculates the bucketId on the hash ring for the input data.
* Uses MurmurHash3_32 and the modulo operation to constrain the result to the
* discrete range [0, bucketNum - 1].
* * @param byteString The input data to be hashed
*
* @return The calculated bucketId
*/
public static int getBucketIdFromByteBuffer(ByteString byteString)
{
// Get the singleton instance to access bucketNum
RetinaUtils retinaUtils = getInstance();

// 1. Calculate the hash using MurmurHash3_32
int hash = Hashing.murmur3_32_fixed()
.hashBytes(byteString.toByteArray())
.asInt();

// 2. Take the absolute value (MurmurHash3_32 can return negative integers)
int absHash = Math.abs(hash);

// 3. Apply modulo operation to compress the hash value to the range [0, bucketNum - 1]
return absHash % retinaUtils.bucketNum;
}

public static RetinaService getRetinaServiceFromBucketId(int bucketId)
{
String retinaHost = BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucketId).getAddress();
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}

public static RetinaService getRetinaServiceFromPath(String path)
{
String retinaHost = extractRetinaHostNameFromPath(path);
if(retinaHost == null || retinaHost.equals(Constants.LOAD_DEFAULT_RETINA_PREFIX))
{
return RetinaService.Instance();
}
return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort);
}

private static String extractRetinaHostNameFromPath(String path)
{
if (path == null || path.isEmpty()) {
return null;
}
int lastSlashIndex = path.lastIndexOf('/');
String baseName = (lastSlashIndex == -1) ? path : path.substring(lastSlashIndex + 1);
int firstUnderscoreIndex = baseName.indexOf('_');
if (firstUnderscoreIndex > 0) {
// The substring from the start of baseName up to (but not including) the first underscore is the hostname.
return baseName.substring(0, firstUnderscoreIndex);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class PixelsRecordReaderBufferImpl implements PixelsRecordReader
private ByteBuffer data;
private final byte[] activeMemtableData;
private VectorizedRowBatch curRowBatch = null;

private final String retinaHost;
/**
* Columns included by reader option; if included, set true
*/
Expand Down Expand Up @@ -104,6 +104,7 @@ private static boolean checkBit(RetinaProto.VisibilityBitmap bitmap, int k)
}

public PixelsRecordReaderBufferImpl(PixelsReaderOption option,
String retinaHost,
byte[] activeMemtableData, List<Long> fileIds, // read version
List<RetinaProto.VisibilityBitmap> visibilityBitmap,
Storage storage,
Expand All @@ -113,6 +114,7 @@ public PixelsRecordReaderBufferImpl(PixelsReaderOption option,
{
ConfigFactory configFactory = ConfigFactory.Instance();
this.retinaBufferStorageFolder = configFactory.getProperty("retina.buffer.object.storage.folder");
this.retinaHost = retinaHost;
this.retinaEnabled = Boolean.parseBoolean(configFactory.getProperty("retina.enable"));

this.option = option;
Expand Down Expand Up @@ -349,7 +351,7 @@ public void close() throws IOException

private String getRetinaBufferStoragePathFromId(long entryId)
{
return this.retinaBufferStorageFolder + String.format("%d/%d", tableId, entryId);
return this.retinaBufferStorageFolder + String.format("%d/%s_%d", tableId, retinaHost, entryId);
}

private void getMemtableDataFromStorage(String path) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.pixelsdb.pixels.common.physical.SchedulerFactory;
import io.pixelsdb.pixels.common.retina.RetinaService;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.RetinaUtils;
import io.pixelsdb.pixels.core.PixelsFooterCache;
import io.pixelsdb.pixels.core.PixelsProto;
import io.pixelsdb.pixels.core.TypeDescription;
Expand All @@ -57,7 +58,7 @@
public class PixelsRecordReaderImpl implements PixelsRecordReader
{
private static final Logger logger = LogManager.getLogger(PixelsRecordReaderImpl.class);
private final RetinaService retinaService = RetinaService.Instance();
private final RetinaService retinaService;

private final PhysicalReader physicalReader;
private final PixelsProto.PostScript postScript;
Expand Down Expand Up @@ -175,6 +176,7 @@ public PixelsRecordReaderImpl(PhysicalReader physicalReader,
this.pixelsFooterCache = pixelsFooterCache;
this.filePath = this.physicalReader.getPath();
this.includedColumnTypes = new ArrayList<>();
this.retinaService = RetinaUtils.getRetinaServiceFromPath(this.filePath);
// Issue #175: this check is currently not necessary.
// requireNonNull(TransContextCache.Instance().getQueryTransInfo(this.transId),
// "The transaction context does not contain query (trans) id '" + this.transId + "'");
Expand Down