diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java index cf2aa45c66..30c4b4bdad 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java @@ -33,6 +33,7 @@ import io.pixelsdb.pixels.common.physical.StorageFactory; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.common.utils.DateUtil; + import io.pixelsdb.pixels.common.utils.RetinaUtils; import io.pixelsdb.pixels.core.PixelsWriter; import io.pixelsdb.pixels.core.TypeDescription; import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; @@ -97,7 +98,7 @@ protected void processSourceFile(String originalFilePath) throws IOException, Me // 1. Calculate Primary Key and Bucket ID ByteString pkByteString = calculatePrimaryKeyBytes(colsInLine); // Assume BucketCache has the necessary method and configuration - int bucketId = BucketCache.getBucketIdFromByteBuffer(pkByteString); + int bucketId = RetinaUtils.getBucketIdFromByteBuffer(pkByteString); // 2. Get/Initialize the Writer for this Bucket PerBucketWriter bucketWriter = bucketWriters.computeIfAbsent(bucketId, id -> diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java index ea0ca460d4..4c55177e21 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java @@ -85,31 +85,6 @@ public static BucketCache getInstance() return instance; } - /** - * Calculates the bucketId on the hash ring for the input data. - * Uses MurmurHash3_32 and the modulo operation to constrain the result to the - * discrete range [0, bucketNum - 1]. - * * @param byteString The input data to be hashed - * - * @return The calculated bucketId - */ - public static int getBucketIdFromByteBuffer(ByteString byteString) - { - // Get the singleton instance to access bucketNum - BucketCache cacheInstance = getInstance(); - - // 1. Calculate the hash using MurmurHash3_32 - int hash = Hashing.murmur3_32_fixed() - .hashBytes(byteString.toByteArray()) - .asInt(); - - // 2. Take the absolute value (MurmurHash3_32 can return negative integers) - int absHash = Math.abs(hash); - - // 3. Apply modulo operation to compress the hash value to the range [0, bucketNum - 1] - return absHash % cacheInstance.bucketNum; - } - /** * Core lookup method: Retrieves the corresponding RetinaNodeInfo for a given bucketId. * Uses a cache-aside strategy (lazy loading) to populate the cache upon miss. diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java new file mode 100644 index 0000000000..497aa6ae9f --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/RetinaUtils.java @@ -0,0 +1,111 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.common.utils; + +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import io.pixelsdb.pixels.common.node.BucketCache; +import io.pixelsdb.pixels.common.retina.RetinaService; + + public class RetinaUtils +{ + private static volatile RetinaUtils instance; + private final int bucketNum; + private final int defaultRetinaPort; + + private RetinaUtils() + { + ConfigFactory config = ConfigFactory.Instance(); + this.bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); + this.defaultRetinaPort = Integer.parseInt(config.getProperty("retina.server.port")); + } + + private static RetinaUtils getInstance() + { + if (instance == null) + { + synchronized (RetinaUtils.class) + { + if (instance == null) + { + instance = new RetinaUtils(); + } + } + } + return instance; + } + + /** + * Calculates the bucketId on the hash ring for the input data. + * Uses MurmurHash3_32 and the modulo operation to constrain the result to the + * discrete range [0, bucketNum - 1]. + * * @param byteString The input data to be hashed + * + * @return The calculated bucketId + */ + public static int getBucketIdFromByteBuffer(ByteString byteString) + { + // Get the singleton instance to access bucketNum + RetinaUtils retinaUtils = getInstance(); + + // 1. Calculate the hash using MurmurHash3_32 + int hash = Hashing.murmur3_32_fixed() + .hashBytes(byteString.toByteArray()) + .asInt(); + + // 2. Take the absolute value (MurmurHash3_32 can return negative integers) + int absHash = Math.abs(hash); + + // 3. Apply modulo operation to compress the hash value to the range [0, bucketNum - 1] + return absHash % retinaUtils.bucketNum; + } + + public static RetinaService getRetinaServiceFromBucketId(int bucketId) + { + String retinaHost = BucketCache.getInstance().getRetinaNodeInfoByBucketId(bucketId).getAddress(); + return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort); + } + + public static RetinaService getRetinaServiceFromPath(String path) + { + String retinaHost = extractRetinaHostNameFromPath(path); + if(retinaHost == null || retinaHost.equals(Constants.LOAD_DEFAULT_RETINA_PREFIX)) + { + return RetinaService.Instance(); + } + return RetinaService.CreateInstance(retinaHost, getInstance().defaultRetinaPort); + } + + private static String extractRetinaHostNameFromPath(String path) + { + if (path == null || path.isEmpty()) { + return null; + } + int lastSlashIndex = path.lastIndexOf('/'); + String baseName = (lastSlashIndex == -1) ? path : path.substring(lastSlashIndex + 1); + int firstUnderscoreIndex = baseName.indexOf('_'); + if (firstUnderscoreIndex > 0) { + // The substring from the start of baseName up to (but not including) the first underscore is the hostname. + return baseName.substring(0, firstUnderscoreIndex); + } + return null; + } +} diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java index aff92b80f3..7d16881300 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderBufferImpl.java @@ -49,7 +49,7 @@ public class PixelsRecordReaderBufferImpl implements PixelsRecordReader private ByteBuffer data; private final byte[] activeMemtableData; private VectorizedRowBatch curRowBatch = null; - + private final String retinaHost; /** * Columns included by reader option; if included, set true */ @@ -104,6 +104,7 @@ private static boolean checkBit(RetinaProto.VisibilityBitmap bitmap, int k) } public PixelsRecordReaderBufferImpl(PixelsReaderOption option, + String retinaHost, byte[] activeMemtableData, List fileIds, // read version List visibilityBitmap, Storage storage, @@ -113,6 +114,7 @@ public PixelsRecordReaderBufferImpl(PixelsReaderOption option, { ConfigFactory configFactory = ConfigFactory.Instance(); this.retinaBufferStorageFolder = configFactory.getProperty("retina.buffer.object.storage.folder"); + this.retinaHost = retinaHost; this.retinaEnabled = Boolean.parseBoolean(configFactory.getProperty("retina.enable")); this.option = option; @@ -349,7 +351,7 @@ public void close() throws IOException private String getRetinaBufferStoragePathFromId(long entryId) { - return this.retinaBufferStorageFolder + String.format("%d/%d", tableId, entryId); + return this.retinaBufferStorageFolder + String.format("%d/%s_%d", tableId, retinaHost, entryId); } private void getMemtableDataFromStorage(String path) throws IOException diff --git a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java index 0c638a9735..7eb707135c 100644 --- a/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java +++ b/pixels-core/src/main/java/io/pixelsdb/pixels/core/reader/PixelsRecordReaderImpl.java @@ -31,6 +31,7 @@ import io.pixelsdb.pixels.common.physical.SchedulerFactory; import io.pixelsdb.pixels.common.retina.RetinaService; import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.RetinaUtils; import io.pixelsdb.pixels.core.PixelsFooterCache; import io.pixelsdb.pixels.core.PixelsProto; import io.pixelsdb.pixels.core.TypeDescription; @@ -57,7 +58,7 @@ public class PixelsRecordReaderImpl implements PixelsRecordReader { private static final Logger logger = LogManager.getLogger(PixelsRecordReaderImpl.class); - private final RetinaService retinaService = RetinaService.Instance(); + private final RetinaService retinaService; private final PhysicalReader physicalReader; private final PixelsProto.PostScript postScript; @@ -175,6 +176,7 @@ public PixelsRecordReaderImpl(PhysicalReader physicalReader, this.pixelsFooterCache = pixelsFooterCache; this.filePath = this.physicalReader.getPath(); this.includedColumnTypes = new ArrayList<>(); + this.retinaService = RetinaUtils.getRetinaServiceFromPath(this.filePath); // Issue #175: this check is currently not necessary. // requireNonNull(TransContextCache.Instance().getQueryTransInfo(this.transId), // "The transaction context does not contain query (trans) id '" + this.transId + "'");