diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/LoadExecutor.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/LoadExecutor.java index ebd6b53d7a..98e15ffb5d 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/LoadExecutor.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/executor/LoadExecutor.java @@ -19,9 +19,7 @@ */ package io.pixelsdb.pixels.cli.executor; -import io.pixelsdb.pixels.cli.load.Consumer; -import io.pixelsdb.pixels.cli.load.Parameters; -import io.pixelsdb.pixels.cli.load.PixelsConsumer; +import io.pixelsdb.pixels.cli.load.*; import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.exception.RetinaException; import io.pixelsdb.pixels.common.metadata.MetadataService; @@ -32,10 +30,11 @@ import io.pixelsdb.pixels.common.retina.RetinaService; import io.pixelsdb.pixels.common.transaction.TransContext; import io.pixelsdb.pixels.common.transaction.TransService; +import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.core.encoding.EncodingLevel; +import io.pixelsdb.pixels.daemon.NodeProto; import net.sourceforge.argparse4j.inf.Namespace; -import java.util.Iterator; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; @@ -47,7 +46,7 @@ */ public class LoadExecutor implements CommandExecutor { - private final RetinaService retinaService = RetinaService.Instance(); + private final RetinaService defaultRetinaService = RetinaService.Instance(); @Override public void execute(Namespace ns, String command) throws Exception @@ -80,26 +79,34 @@ public void execute(Namespace ns, String command) throws Exception // source already exist, producer option is false, add list of source to the queue List fileList = storage.listPaths(origin); BlockingQueue inputFiles = new LinkedBlockingQueue<>(fileList.size()); - ConcurrentLinkedQueue loadedFiles = new ConcurrentLinkedQueue<>(); - ConcurrentLinkedQueue loadedPaths = new ConcurrentLinkedQueue<>(); + ConcurrentLinkedQueue loadedInfos = new ConcurrentLinkedQueue<>(); for (String filePath : fileList) { inputFiles.add(storage.ensureSchemePrefix(filePath)); } long startTime = System.currentTimeMillis(); - if (startConsumers(threadNum, inputFiles, parameters, loadedFiles, loadedPaths)) + if (startConsumers(threadNum, inputFiles, parameters, loadedInfos)) { - Iterator fileIterator = loadedFiles.iterator(); - Iterator pathIterator = loadedPaths.iterator(); - while (fileIterator.hasNext() && pathIterator.hasNext()) + int retinaServerPort = Integer.parseInt(ConfigFactory.Instance().getProperty("retina.server.port")); + for(LoadedInfo loadedInfo : loadedInfos) { - File file = fileIterator.next(); - Path path = pathIterator.next(); + File file = loadedInfo.loadedFile; + Path path = loadedInfo.loadedPath; metadataService.updateFile(file); try { - retinaService.addVisibility(File.getFilePath(path, file)); + + NodeProto.NodeInfo nodeInfo = loadedInfo.loadedRetinaNode; + if(nodeInfo == null) + { + defaultRetinaService.addVisibility(File.getFilePath(path, file)); + } else + { + RetinaService retinaService = RetinaService.CreateInstance(nodeInfo.getAddress(), retinaServerPort); + retinaService.addVisibility(File.getFilePath(path, file)); + } + } catch (RetinaException e) { System.out.println("add visibility for ordered file '" + file + "' failed"); @@ -123,11 +130,11 @@ public void execute(Namespace ns, String command) throws Exception * @param concurrency the number of threads for data loading * @param inputFiles the queue of the paths of input files * @param parameters the parameters for data loading, e.g., the schema name and table name - * @param loadedFiles the information of the loaded pixels files + * @param loadedInfos the information of the loaded pixels files * @return true if consumers complete successfully */ private boolean startConsumers(int concurrency, BlockingQueue inputFiles, Parameters parameters, - ConcurrentLinkedQueue loadedFiles, ConcurrentLinkedQueue loadedPaths) + ConcurrentLinkedQueue loadedInfos) { boolean success = false; try @@ -147,7 +154,14 @@ private boolean startConsumers(int concurrency, BlockingQueue inputFiles { for (int i = 0; i < concurrency; i++) { - PixelsConsumer pixelsConsumer = new PixelsConsumer(inputFiles, parameters, loadedFiles, loadedPaths); + AbstractPixelsConsumer pixelsConsumer; + if(parameters.getIndex() == null) + { + pixelsConsumer = new SimplePixelsConsumer(inputFiles, parameters, loadedInfos); + } else + { + pixelsConsumer = new IndexedPixelsConsumer(inputFiles, parameters, loadedInfos); + } consumers[i] = pixelsConsumer; pixelsConsumer.start(); } diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java new file mode 100644 index 0000000000..0075c9766c --- /dev/null +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/AbstractPixelsConsumer.java @@ -0,0 +1,269 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + +package io.pixelsdb.pixels.cli.load; + +import io.pixelsdb.pixels.common.exception.MetadataException; +import io.pixelsdb.pixels.common.index.IndexService; +import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.common.metadata.MetadataService; +import io.pixelsdb.pixels.common.metadata.domain.File; +import io.pixelsdb.pixels.common.metadata.domain.Path; +import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; +import io.pixelsdb.pixels.common.physical.Storage; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.common.utils.DateUtil; +import io.pixelsdb.pixels.core.PixelsWriter; +import io.pixelsdb.pixels.core.PixelsWriterImpl; +import io.pixelsdb.pixels.core.TypeDescription; +import io.pixelsdb.pixels.core.encoding.EncodingLevel; +import io.pixelsdb.pixels.core.vector.ColumnVector; +import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; +import io.pixelsdb.pixels.daemon.NodeProto; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Pattern; + +/** + * @author hank + * @author: tao + * @create in 2018-10-30 15:18 + **/ +public abstract class AbstractPixelsConsumer extends Consumer +{ + public static final AtomicInteger GlobalTargetPathId = new AtomicInteger(0); + protected final BlockingQueue inputFiles; + protected final Parameters parameters; + protected final ConcurrentLinkedQueue loadedInfos; + protected final List tmpFiles = new ArrayList<>(); + protected final List targetPaths; + protected MetadataService metadataService; + // Shared configurations + protected TypeDescription schema; + protected int maxRowNum; + protected String regex; + protected EncodingLevel encodingLevel; + protected boolean nullsPadding; + protected SinglePointIndex index; + protected IndexService defaultIndexService; + protected int[] pkMapping; + protected int[] orderMapping; + protected TypeDescription pkTypeDescription; + // Shared Pixels file configs + protected int pixelStride; + protected int rowGroupSize; + protected long blockSize; + protected short replication; + + protected AbstractPixelsConsumer(BlockingQueue inputFiles, Parameters parameters, + ConcurrentLinkedQueue loadedInfos) + { + this.inputFiles = inputFiles; + this.parameters = parameters; + this.loadedInfos = loadedInfos; + this.metadataService = parameters.getMetadataService(); + this.targetPaths = parameters.getLoadingPaths(); + initializeCommonConfig(); + } + + private void initializeCommonConfig() + { + // Initialization logic moved from original run() + this.schema = TypeDescription.fromString(parameters.getSchema()); + this.maxRowNum = parameters.getMaxRowNum(); + this.regex = parameters.getRegex().equals("\\s") ? " " : parameters.getRegex(); + this.encodingLevel = parameters.getEncodingLevel(); + this.nullsPadding = parameters.isNullsPadding(); + this.orderMapping = parameters.getOrderMapping(); + this.pkMapping = parameters.getPkMapping(); + this.index = parameters.getIndex(); + this.pkTypeDescription = parameters.getPkTypeDescription(); + this.defaultIndexService = IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.rpc); + + ConfigFactory configFactory = ConfigFactory.Instance(); + this.pixelStride = Integer.parseInt(configFactory.getProperty("pixel.stride")); + this.rowGroupSize = Integer.parseInt(configFactory.getProperty("row.group.size")); + this.blockSize = Long.parseLong(configFactory.getProperty("block.size")); + this.replication = Short.parseShort(configFactory.getProperty("block.replication")); + } + + @Override + public void run() + { + System.out.println("Start " + this.getClass().getSimpleName() + ", " + currentThread().getName() + + ", time: " + DateUtil.formatTime(new Date())); + int count = 0; + boolean isRunning = true; + + try + { + while (isRunning) + { + String originalFilePath = inputFiles.poll(2, TimeUnit.SECONDS); + if (originalFilePath != null) + { + count++; + processSourceFile(originalFilePath); + } else + { + // No source file can be consumed within 2 seconds, loading is considered finished. + isRunning = false; + } + } + + // Flush any remaining data from writers + flushRemainingData(); + + } catch (InterruptedException e) + { + System.out.println(this.getClass().getSimpleName() + ": " + e.getMessage()); + currentThread().interrupt(); + } catch (Throwable e) + { + e.printStackTrace(); + } finally + { + cleanupTemporaryFiles(); + System.out.println(currentThread().getName() + ":" + count); + System.out.println("Exit " + this.getClass().getSimpleName() + ", thread: " + currentThread().getName() + + ", time: " + DateUtil.formatTime(new Date())); + } + } + + protected abstract void processSourceFile(String originalFilePath) throws IOException, MetadataException; + + protected abstract void flushRemainingData() throws IOException, MetadataException; + + private void cleanupTemporaryFiles() + { + for (File tmpFile : tmpFiles) + { + if (tmpFile.getType() == File.Type.TEMPORARY) + { + try + { + metadataService.deleteFiles(Collections.singletonList((tmpFile.getId()))); + } catch (MetadataException e) + { + e.printStackTrace(); + } + } + } + } + + /** + * Close the pixels writer and add the file to loaded file queue. + * Files in the loaded files queue will be updated in metadata. + * + * @param pixelsWriter the pixels writer + * @param loadedFile the file name has been loaded + * @param filePath the path of the directory where the file was written + * @throws IOException + */ + protected void closeWriterAndAddFile(PixelsWriter pixelsWriter, File loadedFile, Path filePath, NodeProto.NodeInfo nodeInfo) throws IOException + { + pixelsWriter.close(); + loadedFile.setType(File.Type.REGULAR); + loadedFile.setNumRowGroup(pixelsWriter.getNumRowGroup()); + LoadedInfo loadedInfo = new LoadedInfo(); + loadedInfo.loadedFile = loadedFile; + loadedInfo.loadedPath = filePath; + loadedInfo.loadedRetinaNode = nodeInfo; + this.loadedInfos.add(loadedInfo); + } + + /** + * Create a temporary file through the metadata service + * + * @param fileName the file name without directory path + * @param filePath the path of the directory where the file was written + */ + protected File openTmpFile(String fileName, Path filePath) throws MetadataException + { + File file = new File(); + file.setName(fileName); + file.setType(File.Type.TEMPORARY); + file.setNumRowGroup(1); + file.setPathId(filePath.getId()); + String tmpFilePath = filePath.getUri() + "/" + fileName; + this.metadataService.addFiles(Collections.singletonList(file)); + file.setId(metadataService.getFileId(tmpFilePath)); + return file; + } + + protected PixelsWriter getPixelsWriter(Storage targetStorage, String targetFilePath) + { + return PixelsWriterImpl.newBuilder() + .setSchema(schema) + .setHasHiddenColumn(true) + .setPixelStride(pixelStride) + .setRowGroupSize(rowGroupSize) + .setStorage(targetStorage) + .setPath(targetFilePath) + .setBlockSize(blockSize) + .setReplication(replication) + .setBlockPadding(true) + .setEncodingLevel(encodingLevel) + .setNullsPadding(nullsPadding) + .setCompressionBlockSize(1) + .build(); + } + + protected void writeRowToBatch(VectorizedRowBatch rowBatch, String line, long timestamp) + { + String[] colsInLine = line.split(Pattern.quote(regex)); + writeRowToBatch(rowBatch, colsInLine, timestamp); + } + + protected void writeRowToBatch(VectorizedRowBatch rowBatch, String[] colsInLine, long timestamp) + { + ColumnVector[] columnVectors = rowBatch.cols; + rowBatch.size++; + + for (int i = 0; i < columnVectors.length - 1; i++) + { + try + { + int valueIdx = orderMapping[i]; + if (valueIdx >= colsInLine.length || + colsInLine[valueIdx].isEmpty() || + colsInLine[valueIdx].equalsIgnoreCase("\\N")) + { + columnVectors[i].addNull(); + } else + { + columnVectors[i].add(colsInLine[valueIdx]); + } + } catch (Exception e) + { + e.printStackTrace(); + } + } + columnVectors[columnVectors.length - 1].add(timestamp); + } +} diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java new file mode 100644 index 0000000000..cf2aa45c66 --- /dev/null +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/IndexedPixelsConsumer.java @@ -0,0 +1,298 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.cli.load; + + import com.google.protobuf.ByteString; + import io.pixelsdb.pixels.common.exception.IndexException; + import io.pixelsdb.pixels.common.exception.MetadataException; + import io.pixelsdb.pixels.common.index.IndexService; + import io.pixelsdb.pixels.common.index.RPCIndexService; + import io.pixelsdb.pixels.common.index.RowIdAllocator; + import io.pixelsdb.pixels.common.metadata.domain.File; + import io.pixelsdb.pixels.common.metadata.domain.Path; + import io.pixelsdb.pixels.common.node.BucketCache; + import io.pixelsdb.pixels.common.physical.Storage; + import io.pixelsdb.pixels.common.physical.StorageFactory; + import io.pixelsdb.pixels.common.utils.ConfigFactory; + import io.pixelsdb.pixels.common.utils.DateUtil; + import io.pixelsdb.pixels.core.PixelsWriter; + import io.pixelsdb.pixels.core.TypeDescription; + import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; + import io.pixelsdb.pixels.daemon.NodeProto; + import io.pixelsdb.pixels.index.IndexProto; + + import java.io.BufferedReader; + import java.io.IOException; + import java.io.InputStreamReader; + import java.nio.ByteBuffer; + import java.util.ArrayList; + import java.util.LinkedList; + import java.util.List; + import java.util.Map; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentLinkedQueue; + import java.util.regex.Pattern; + + /** + * Consumer implementation for loading data with Primary Index (Index != null). + * It routes data rows to different PixelsFiles based on their calculated bucketId. + */ + public class IndexedPixelsConsumer extends AbstractPixelsConsumer + { + + // Map: Bucket ID -> Writer state + private final Map bucketWriters = new ConcurrentHashMap<>(); + private final BucketCache bucketCache = BucketCache.getInstance(); + private final Map indexServices = new ConcurrentHashMap<>(); + private final int indexServerPort; + + public IndexedPixelsConsumer(BlockingQueue queue, Parameters parameters, + ConcurrentLinkedQueue loadedInfos) + { + super(queue, parameters, loadedInfos); + ConfigFactory config = ConfigFactory.Instance(); + this.indexServerPort = Integer.parseInt(config.getProperty("index.server.port")); + } + + @Override + protected void processSourceFile(String originalFilePath) throws IOException, MetadataException + { + Storage originStorage = StorageFactory.Instance().getStorage(originalFilePath); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(originStorage.open(originalFilePath)))) + { + + System.out.println("loading indexed data from: " + originalFilePath); + long timestamp = parameters.getTimestamp(); + String line; + + while ((line = reader.readLine()) != null) + { + if (line.isEmpty()) + { + System.err.println("thread: " + currentThread().getName() + " got empty line."); + continue; + } + + String[] colsInLine = line.split(Pattern.quote(regex)); + + // 1. Calculate Primary Key and Bucket ID + ByteString pkByteString = calculatePrimaryKeyBytes(colsInLine); + // Assume BucketCache has the necessary method and configuration + int bucketId = BucketCache.getBucketIdFromByteBuffer(pkByteString); + + // 2. Get/Initialize the Writer for this Bucket + PerBucketWriter bucketWriter = bucketWriters.computeIfAbsent(bucketId, id -> + { + try + { + return initializeBucketWriter(id); + } catch (Exception e) + { + throw new RuntimeException("Failed to initialize writer for bucket " + id, e); + } + }); + + // 3. Write Data Row + writeRowToBatch(bucketWriter.rowBatch, colsInLine, timestamp); + bucketWriter.rowCounter++; + + try + { + // 4. Update Index Entry + updateIndexEntry(bucketWriter, pkByteString); + + // 5. Check and Flush Row Batch + if (bucketWriter.rowBatch.size >= bucketWriter.rowBatch.getMaxSize()) + { + flushRowBatch(bucketWriter); + } + + // 6. Check and Close File + if (bucketWriter.rowCounter >= maxRowNum) + { + closePixelsFile(bucketWriter); + // Remove writer to force re-initialization on next use + bucketWriters.remove(bucketId); + } + } catch (IndexException e) + { + e.printStackTrace(); + } + } + } + } + + @Override + protected void flushRemainingData() throws IOException, MetadataException + { + for (PerBucketWriter bucketWriter : bucketWriters.values()) + { + if (bucketWriter.rowCounter > 0) + { + try + { + closePixelsFile(bucketWriter); + } catch (IndexException e) + { + e.printStackTrace(); + } + } + } + bucketWriters.clear(); + } + + /** + * Initializes a new PixelsWriter and associated File/Path for a given bucket ID. + */ + private PerBucketWriter initializeBucketWriter(int bucketId) throws IOException, MetadataException + { + // Use the Node Cache to find the responsible Retina Node + NodeProto.NodeInfo targetNode = bucketCache.getRetinaNodeInfoByBucketId(bucketId); + + // Target path selection logic (simple round-robin for the path, but the NodeInfo is bucket-specific) + int targetPathId = GlobalTargetPathId.getAndIncrement() % targetPaths.size(); + Path currTargetPath = targetPaths.get(targetPathId); + String targetDirPath = currTargetPath.getUri(); + Storage targetStorage = StorageFactory.Instance().getStorage(targetDirPath); + + if (!targetDirPath.endsWith("/")) + { + targetDirPath += "/"; + } + String targetFileName = targetNode.getAddress() + "_" + DateUtil.getCurTime() + "_" + bucketId + ".pxl"; + String targetFilePath = targetDirPath + targetFileName; + + PixelsWriter pixelsWriter = getPixelsWriter(targetStorage, targetFilePath); + + File currFile = openTmpFile(targetFileName, currTargetPath); + tmpFiles.add(currFile); + + return new PerBucketWriter(pixelsWriter, currFile, currTargetPath, targetNode); + } + + // --- Private Helper Methods --- + + private ByteString calculatePrimaryKeyBytes(String[] colsInLine) + { + TypeDescription pkTypeDescription = parameters.getPkTypeDescription(); + List pkBytes = new LinkedList<>(); + int indexKeySize = 0; + + for (int i = 0; i < pkMapping.length; i++) + { + int pkColumnId = pkMapping[i]; + // Safety check for array bounds + if (pkColumnId >= colsInLine.length) + { + throw new IllegalArgumentException("Primary key mapping index out of bounds for line."); + } + byte[] bytes = pkTypeDescription.getChildren().get(i).convertSqlStringToByte(colsInLine[pkColumnId]); + pkBytes.add(bytes); + indexKeySize += bytes.length; + } + + ByteBuffer indexKeyBuffer = ByteBuffer.allocate(indexKeySize); + for (byte[] pkByte : pkBytes) + { + indexKeyBuffer.put(pkByte); + } + return ByteString.copyFrom((ByteBuffer) indexKeyBuffer.rewind()); + } + + private void updateIndexEntry(PerBucketWriter bucketWriter, ByteString pkByteString) throws IndexException + { + IndexProto.PrimaryIndexEntry.Builder builder = IndexProto.PrimaryIndexEntry.newBuilder(); + builder.getIndexKeyBuilder() + .setTimestamp(parameters.getTimestamp()) + .setKey(pkByteString) + .setIndexId(index.getId()) + .setTableId(index.getTableId()); + + builder.setRowId(bucketWriter.rowIdAllocator.getRowId()); + builder.getRowLocationBuilder() + .setRgId(bucketWriter.rgId) + .setFileId(bucketWriter.currFile.getId()) + .setRgRowOffset(bucketWriter.rgRowOffset++); + + bucketWriter.indexEntries.add(builder.build()); + } + + private void flushRowBatch(PerBucketWriter bucketWriter) throws IOException, IndexException + { + bucketWriter.pixelsWriter.addRowBatch(bucketWriter.rowBatch); + bucketWriter.rowBatch.reset(); + + bucketWriter.rgId = bucketWriter.pixelsWriter.getNumRowGroup(); + if (bucketWriter.prevRgId != bucketWriter.rgId) + { + bucketWriter.rgRowOffset = 0; + bucketWriter.prevRgId = bucketWriter.rgId; + } + + // Push index entries to the corresponding IndexService (determined by targetNode address) + bucketWriter.indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), bucketWriter.indexEntries); + bucketWriter.indexEntries.clear(); + } + + private void closePixelsFile(PerBucketWriter bucketWriter) throws IOException, MetadataException, IndexException + { + // Final flush of remaining rows/indexes + if (bucketWriter.rowBatch.size != 0) + { + flushRowBatch(bucketWriter); + } + + closeWriterAndAddFile(bucketWriter.pixelsWriter, bucketWriter.currFile, bucketWriter.currTargetPath, bucketWriter.targetNode); + } + + private class PerBucketWriter + { + PixelsWriter pixelsWriter; + File currFile; + Path currTargetPath; + int rgId; + int rgRowOffset; + int prevRgId; + int rowCounter; + NodeProto.NodeInfo targetNode; + List indexEntries = new ArrayList<>(); + VectorizedRowBatch rowBatch; + IndexService indexService; + RowIdAllocator rowIdAllocator; + + public PerBucketWriter(PixelsWriter writer, File file, Path path, NodeProto.NodeInfo node) + { + this.pixelsWriter = writer; + this.currFile = file; + this.currTargetPath = path; + this.targetNode = node; + this.rgId = writer.getNumRowGroup(); + this.prevRgId = this.rgId; + this.rgRowOffset = 0; + this.rowCounter = 0; + this.rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE); + this.indexService = indexServices.computeIfAbsent(node, nodeInfo -> + RPCIndexService.CreateInstance(nodeInfo.getAddress(), indexServerPort)); + this.rowIdAllocator = new RowIdAllocator(index.getTableId(), 1000, this.indexService); + } + } + } \ No newline at end of file diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/LoadedInfo.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/LoadedInfo.java new file mode 100644 index 0000000000..be909b7e63 --- /dev/null +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/LoadedInfo.java @@ -0,0 +1,32 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.cli.load; + +import io.pixelsdb.pixels.common.metadata.domain.File; +import io.pixelsdb.pixels.common.metadata.domain.Path; +import io.pixelsdb.pixels.daemon.NodeProto; + +public class LoadedInfo +{ + public File loadedFile; + public Path loadedPath; + public NodeProto.NodeInfo loadedRetinaNode; +} diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java index c32819c540..a12323e672 100644 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/Parameters.java @@ -50,7 +50,6 @@ public class Parameters private final MetadataService metadataService; private final long transId; private final long timestamp; - private RowIdAllocator rowIdAllocator; private SinglePointIndex index; public List getLoadingPaths() @@ -92,11 +91,6 @@ public boolean isNullsPadding() public long getTimestamp() { return timestamp; } - public RowIdAllocator getRowIdAllocator() - { - return rowIdAllocator; - } - public int[] getPkMapping() { return pkMapping; @@ -220,7 +214,6 @@ public boolean initExtra() throws MetadataException, InterruptedException if(index != null) { - rowIdAllocator = new RowIdAllocator(table.getId(), 1000, IndexServiceProvider.ServiceMode.rpc); int[] orderKeyColIds = new int[index.getKeyColumns().getKeyColumnIds().size()]; List orderKeyColNames = new LinkedList<>(); List orderKeyColTypes = new LinkedList<>(); diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/PixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/PixelsConsumer.java deleted file mode 100644 index 8a3f6746bb..0000000000 --- a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/PixelsConsumer.java +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Copyright 2025 PixelsDB. - * - * This file is part of Pixels. - * - * Pixels is free software: you can redistribute it and/or modify - * it under the terms of the Affero GNU General Public License as - * published by the Free Software Foundation, either version 3 of - * the License, or (at your option) any later version. - * - * Pixels is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Affero GNU General Public License for more details. - * - * You should have received a copy of the Affero GNU General Public - * License along with Pixels. If not, see - * . - */ -package io.pixelsdb.pixels.cli.load; - -import com.google.protobuf.ByteString; -import io.pixelsdb.pixels.common.exception.MetadataException; -import io.pixelsdb.pixels.common.index.IndexService; -import io.pixelsdb.pixels.common.index.IndexServiceProvider; -import io.pixelsdb.pixels.common.index.RowIdAllocator; -import io.pixelsdb.pixels.common.metadata.MetadataService; -import io.pixelsdb.pixels.common.metadata.domain.File; -import io.pixelsdb.pixels.common.metadata.domain.Path; -import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; -import io.pixelsdb.pixels.common.physical.Storage; -import io.pixelsdb.pixels.common.physical.StorageFactory; -import io.pixelsdb.pixels.common.utils.ConfigFactory; -import io.pixelsdb.pixels.common.utils.DateUtil; -import io.pixelsdb.pixels.core.PixelsWriter; -import io.pixelsdb.pixels.core.PixelsWriterImpl; -import io.pixelsdb.pixels.core.TypeDescription; -import io.pixelsdb.pixels.core.encoding.EncodingLevel; -import io.pixelsdb.pixels.core.vector.ColumnVector; -import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; -import io.pixelsdb.pixels.index.IndexProto; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Pattern; - -/** - * @author: tao - * @author hank - * @create in 2018-10-30 15:18 - **/ -public class PixelsConsumer extends Consumer -{ - public static final AtomicInteger GlobalTargetPathId = new AtomicInteger(0); - private final BlockingQueue queue; - private final Parameters parameters; - private final ConcurrentLinkedQueue loadedFiles; - private final ConcurrentLinkedQueue loadedPaths; - private final List tmpFiles = new ArrayList<>(); - private MetadataService metadataService; - - public PixelsConsumer(BlockingQueue queue, Parameters parameters, - ConcurrentLinkedQueue loadedFiles, ConcurrentLinkedQueue loadedPaths) - { - this.queue = queue; - this.parameters = parameters; - this.loadedFiles = loadedFiles; - this.loadedPaths = loadedPaths; - this.metadataService = parameters.getMetadataService(); - } - - @Override - public void run() - { - System.out.println("Start PixelsConsumer, " + currentThread().getName() + - ", time: " + DateUtil.formatTime(new Date())); - int count = 0; - - boolean isRunning = true; - try - { - final List targetPaths = parameters.getLoadingPaths(); - String schemaStr = parameters.getSchema(); - int[] orderMapping = parameters.getOrderMapping(); - int maxRowNum = parameters.getMaxRowNum(); - String regex = parameters.getRegex(); - EncodingLevel encodingLevel = parameters.getEncodingLevel(); - boolean nullsPadding = parameters.isNullsPadding(); - RowIdAllocator rowIdAllocator = parameters.getRowIdAllocator(); - int[] pkMapping = parameters.getPkMapping(); - SinglePointIndex index = parameters.getIndex(); - IndexService indexService = IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.rpc); - - if (regex.equals("\\s")) - { - regex = " "; - } - - ConfigFactory configFactory = ConfigFactory.Instance(); - int pixelStride = Integer.parseInt(configFactory.getProperty("pixel.stride")); - int rowGroupSize = Integer.parseInt(configFactory.getProperty("row.group.size")); - long blockSize = Long.parseLong(configFactory.getProperty("block.size")); - short replication = Short.parseShort(configFactory.getProperty("block.replication")); - - TypeDescription schema = TypeDescription.fromString(schemaStr); - VectorizedRowBatch rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE); - ColumnVector[] columnVectors = rowBatch.cols; - - BufferedReader reader; - String line; - - boolean initPixelsFile = true; - String targetFileName = null; - String targetFilePath; - PixelsWriter pixelsWriter = null; - Path currTargetPath = null; - int rowCounter = 0; - int rgId = 0; - int prevRgId = 0; - int rgRowOffset = 0; - File currFile = null; - List indexEntries = new ArrayList<>(); - - while (isRunning) - { - String originalFilePath = queue.poll(2, TimeUnit.SECONDS); - if (originalFilePath != null) - { - count++; - Storage originStorage = StorageFactory.Instance().getStorage(originalFilePath); - reader = new BufferedReader(new InputStreamReader(originStorage.open(originalFilePath))); - - System.out.println("loading data from: " + originalFilePath); - - // loaded rows use the same timestamp - long timestamp = parameters.getTimestamp(); - while ((line = reader.readLine()) != null) - { - if(line.isEmpty()) - { - System.err.println("thread: " + currentThread().getName() + " got empty line."); - continue; - } - - if (initPixelsFile) - { - // we create a new pixels file if we can read a next line from the source file - - // choose the target output directory using round-robin - int targetPathId = GlobalTargetPathId.getAndIncrement() % targetPaths.size(); - currTargetPath = targetPaths.get(targetPathId); - String targetDirPath = currTargetPath.getUri(); - Storage targetStorage = StorageFactory.Instance().getStorage(targetDirPath); - - if (!targetDirPath.endsWith("/")) - { - targetDirPath += "/"; - } - targetFileName = DateUtil.getCurTime() + ".pxl"; - targetFilePath = targetDirPath + targetFileName; - - pixelsWriter = PixelsWriterImpl.newBuilder() - .setSchema(schema) - .setHasHiddenColumn(true) - .setPixelStride(pixelStride) - .setRowGroupSize(rowGroupSize) - .setStorage(targetStorage) - .setPath(targetFilePath) - .setBlockSize(blockSize) - .setReplication(replication) - .setBlockPadding(true) - .setEncodingLevel(encodingLevel) - .setNullsPadding(nullsPadding) - .setCompressionBlockSize(1) - .build(); - - currFile = openTmpFile(targetFileName, currTargetPath); - tmpFiles.add(currFile); - rgId = pixelsWriter.getNumRowGroup(); - rgRowOffset = 0; - } - initPixelsFile = false; - - rowBatch.size++; - rowCounter++; - - String[] colsInLine = line.split(Pattern.quote(regex)); - for (int i = 0; i < columnVectors.length - 1; i++) - { - try - { - int valueIdx = orderMapping[i]; - if (valueIdx >= colsInLine.length || - colsInLine[valueIdx].isEmpty() || - colsInLine[valueIdx].equalsIgnoreCase("\\N")) - { - columnVectors[i].addNull(); - } else - { - columnVectors[i].add(colsInLine[valueIdx]); - } - } - catch (Exception e) - { - System.out.println("line: " + line); - e.printStackTrace(); - } - } - // add hidden timestamp column value - columnVectors[columnVectors.length - 1].add(timestamp); - - if(index != null) - { - // TODO: Support Secondary Index - int indexKeySize = 0; - - TypeDescription pkTypeDescription = parameters.getPkTypeDescription(); - List pkBytes = new LinkedList<>(); - for(int i = 0; i < pkMapping.length; i++) - { - int pkColumnId = pkMapping[i]; - byte[] bytes = pkTypeDescription.getChildren().get(i).convertSqlStringToByte(colsInLine[pkColumnId]); - pkBytes.add(bytes); - indexKeySize += bytes.length; - } - ByteBuffer indexKeyBuffer = ByteBuffer.allocate(indexKeySize); - for(byte[] pkByte : pkBytes) - { - indexKeyBuffer.put(pkByte); - } - - IndexProto.PrimaryIndexEntry.Builder builder = IndexProto.PrimaryIndexEntry.newBuilder(); - builder.getIndexKeyBuilder() - .setTimestamp(parameters.getTimestamp()) - .setKey(ByteString.copyFrom((ByteBuffer) indexKeyBuffer.rewind())) - .setIndexId(index.getId()) - .setTableId(index.getTableId()); - builder.setRowId(rowIdAllocator.getRowId()); - builder.getRowLocationBuilder() - .setRgId(rgId) - .setFileId(currFile.getId()) - .setRgRowOffset(rgRowOffset++); - indexEntries.add(builder.build()); - } - - if (rowBatch.size >= rowBatch.getMaxSize()) - { - pixelsWriter.addRowBatch(rowBatch); - rowBatch.reset(); - rgId = pixelsWriter.getNumRowGroup(); - - if(prevRgId != rgId) - { - rgRowOffset = 0; - prevRgId = rgId; - } - - if(index != null) - { - indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), indexEntries); - indexEntries.clear(); - } - } - - if (rowCounter >= maxRowNum) - { - // finish writing the file - if (rowBatch.size != 0) - { - pixelsWriter.addRowBatch(rowBatch); - rowBatch.reset(); - if(index != null) - { - indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), indexEntries); - indexEntries.clear(); - } - } - closeWriterAndAddFile(pixelsWriter, currFile, currTargetPath); - rowCounter = 0; - initPixelsFile = true; - } - } - reader.close(); - } else - { - // no source file can be consumed within 2 seconds, - // loading is considered to be finished. - isRunning = false; - } - } - - if (rowCounter > 0) - { - // last file to write - if (rowBatch.size != 0) - { - pixelsWriter.addRowBatch(rowBatch); - rowBatch.reset(); - if(index != null) - { - indexService.putPrimaryIndexEntries(index.getTableId(), index.getId(), indexEntries); - indexEntries.clear(); - } - } - closeWriterAndAddFile(pixelsWriter, currFile, currTargetPath); - } - } catch (InterruptedException e) - { - System.out.println("PixelsConsumer: " + e.getMessage()); - currentThread().interrupt(); - } catch (Throwable e) - { - e.printStackTrace(); - } finally - { - for(File tmpFile: tmpFiles) - { - // If the file is successfully loaded, its type should have been locally modified to REGULAR. - // Otherwise, TEMPORARY files need to be cleaned up in the Metadata Service. - if(tmpFile.getType() == File.Type.TEMPORARY) - { - try - { - metadataService.deleteFiles(Collections.singletonList((tmpFile.getId()))); - } catch (MetadataException e) - { - e.printStackTrace(); - } - } - } - System.out.println(currentThread().getName() + ":" + count); - System.out.println("Exit PixelsConsumer, thread: " + currentThread().getName() + - ", time: " + DateUtil.formatTime(new Date())); - } - } - - /** - * Close the pixels writer and add the file to loaded file queue. - * Files in the loaded files queue will be updated in metadata. - * @param pixelsWriter the pixels writer - * @param loadedFile the file name has been loaded - * @param filePath the path of the directory where the file was written - * @throws IOException - */ - private void closeWriterAndAddFile(PixelsWriter pixelsWriter, File loadedFile, Path filePath) throws IOException - { - pixelsWriter.close(); - loadedFile.setType(File.Type.REGULAR); - loadedFile.setNumRowGroup(pixelsWriter.getNumRowGroup()); - this.loadedFiles.offer(loadedFile); - this.loadedPaths.offer(filePath); - } - - /** - * Create a temporary file through the metadata service - * @param fileName the file name without directory path - * @param filePath the path of the directory where the file was written - */ - private File openTmpFile(String fileName, Path filePath) throws MetadataException - { - File file = new File(); - file.setName(fileName); - file.setType(File.Type.TEMPORARY); - file.setNumRowGroup(1); - file.setPathId(filePath.getId()); - String tmpFilePath = filePath.getUri() + "/" + fileName; - this.metadataService.addFiles(Collections.singletonList(file)); - file.setId(metadataService.getFileId(tmpFilePath)); - return file; - } -} diff --git a/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/SimplePixelsConsumer.java b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/SimplePixelsConsumer.java new file mode 100644 index 0000000000..72856c460a --- /dev/null +++ b/pixels-cli/src/main/java/io/pixelsdb/pixels/cli/load/SimplePixelsConsumer.java @@ -0,0 +1,166 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.cli.load; + + + import io.pixelsdb.pixels.common.exception.MetadataException; + import io.pixelsdb.pixels.common.metadata.domain.File; + import io.pixelsdb.pixels.common.metadata.domain.Path; + import io.pixelsdb.pixels.common.physical.Storage; + import io.pixelsdb.pixels.common.physical.StorageFactory; + import io.pixelsdb.pixels.common.utils.Constants; + import io.pixelsdb.pixels.common.utils.DateUtil; + import io.pixelsdb.pixels.core.PixelsWriter; + import io.pixelsdb.pixels.core.PixelsWriterImpl; + import io.pixelsdb.pixels.core.TypeDescription; + import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; + + import java.io.BufferedReader; + import java.io.IOException; + import java.io.InputStreamReader; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.ConcurrentLinkedQueue; + + + /** + * Consumer implementation for loading data without any Primary Index (Index == null). + * Uses simple round-robin path selection. + */ + public class SimplePixelsConsumer extends AbstractPixelsConsumer + { + + private final VectorizedRowBatch rowBatch; + private PixelsWriter pixelsWriter; + private File currFile; + private Path currTargetPath; + private int rowCounter = 0; + + public SimplePixelsConsumer(BlockingQueue queue, Parameters parameters, + ConcurrentLinkedQueue loadedInfos) + { + super(queue, parameters, loadedInfos); + this.rowBatch = schema.createRowBatchWithHiddenColumn(pixelStride, TypeDescription.Mode.NONE); + } + + @Override + protected void processSourceFile(String originalFilePath) throws IOException, MetadataException + { + Storage originStorage = StorageFactory.Instance().getStorage(originalFilePath); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(originStorage.open(originalFilePath)))) + { + System.out.println("loading data from: " + originalFilePath); + long timestamp = parameters.getTimestamp(); + String line; + + while ((line = reader.readLine()) != null) + { + if (line.isEmpty()) + { + System.err.println("thread: " + currentThread().getName() + " got empty line."); + continue; + } + + // 1. Check if a new PixelsFile needs to be initialized + if (pixelsWriter == null) + { + initializePixelsWriter(); + } + + // 2. Write row to batch + writeRowToBatch(rowBatch, line, timestamp); + rowCounter++; + + // 3. Check and Flush Row Batch + if (rowBatch.size >= rowBatch.getMaxSize()) + { + pixelsWriter.addRowBatch(rowBatch); + rowBatch.reset(); + } + + // 4. Check and Close File + if (rowCounter >= maxRowNum) + { + closePixelsFile(); + } + } + } + } + + @Override + protected void flushRemainingData() throws IOException + { + if (rowCounter > 0) + { + closePixelsFile(); + } + } + + // --- Private Helper Methods for Simple Consumer --- + + private void initializePixelsWriter() throws MetadataException, IOException + { + int targetPathId = GlobalTargetPathId.getAndIncrement() % targetPaths.size(); + currTargetPath = targetPaths.get(targetPathId); + String targetDirPath = currTargetPath.getUri(); + Storage targetStorage = StorageFactory.Instance().getStorage(targetDirPath); + + if (!targetDirPath.endsWith("/")) + { + targetDirPath += "/"; + } + String targetFileName = Constants.LOAD_DEFAULT_RETINA_PREFIX + DateUtil.getCurTime() + ".pxl"; + String targetFilePath = targetDirPath + targetFileName; + + pixelsWriter = PixelsWriterImpl.newBuilder() + .setSchema(schema) + .setHasHiddenColumn(true) + .setPixelStride(pixelStride) + .setRowGroupSize(rowGroupSize) + .setStorage(targetStorage) + .setPath(targetFilePath) + .setBlockSize(blockSize) + .setReplication(replication) + .setBlockPadding(true) + .setEncodingLevel(encodingLevel) + .setNullsPadding(nullsPadding) + .setCompressionBlockSize(1) + .build(); + + currFile = openTmpFile(targetFileName, currTargetPath); + tmpFiles.add(currFile); + rowCounter = 0; + } + + private void closePixelsFile() throws IOException + { + if (rowBatch.size != 0) + { + pixelsWriter.addRowBatch(rowBatch); + rowBatch.reset(); + } + closeWriterAndAddFile(pixelsWriter, currFile, currTargetPath, null); + + // Reset state for next file + pixelsWriter = null; + currFile = null; + rowCounter = 0; + } + } \ No newline at end of file diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java index 454ccf424f..579927386e 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/index/RowIdAllocator.java @@ -51,6 +51,17 @@ public RowIdAllocator(long tableId, int batchSize, IndexServiceProvider.ServiceM this.indexService = IndexServiceProvider.getService(mode); } + public RowIdAllocator(long tableId, int batchSize, IndexService indexService) + { + if (batchSize <= 0) + { + throw new IllegalArgumentException("batchSize must be positive."); + } + this.tableId = tableId; + this.batchSize = batchSize; + this.indexService = indexService; + } + /** * Get a unique rowId. * @return diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java new file mode 100644 index 0000000000..ea0ca460d4 --- /dev/null +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/BucketCache.java @@ -0,0 +1,146 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.common.node; + +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.daemon.NodeProto; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Component responsible for managing the cache of bucketId to RetinaNodeInfo mappings. + * It uses the Singleton pattern and lazy initialization to ensure a single instance + * and deferred creation. + * * NOTE: The cache invalidation logic (when the hash ring changes) needs to be integrated + * with NodeServiceImpl, which is simplified in this example. + */ +public class BucketCache +{ + + // Lock object for thread-safe singleton initialization + private static final Object lock = new Object(); + // Lazy-loaded Singleton instance + private static volatile BucketCache instance; + // Thread-safe map cache: Key: bucketId (0 to bucketNum - 1), Value: RetinaNodeInfo + private final Map bucketToNodeMap; + + // NodeService client stub (would be used for actual RPC calls in a real application) + // private final NodeServiceGrpc.NodeServiceBlockingStub nodeServiceStub; + // The total number of discrete hash points (M) loaded from configuration + private final int bucketNum; + private final NodeService nodeService; + + /** + * Private constructor to enforce the Singleton pattern. + */ + private BucketCache() + { + // In a real application, bucketNum should be fetched from ConfigFactory + ConfigFactory config = ConfigFactory.Instance(); + this.bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); + + // Initialize the cache structure + this.bucketToNodeMap = new ConcurrentHashMap<>(bucketNum); + this.nodeService = NodeService.Instance(); + } + + /** + * Retrieves the singleton instance of BucketToNodeCache. Uses double-checked + * locking for thread-safe lazy initialization. + * * @return The BucketToNodeCache instance + */ + public static BucketCache getInstance() + { + if (instance == null) + { + synchronized (lock) + { + if (instance == null) + { + instance = new BucketCache(); + } + } + } + return instance; + } + + /** + * Calculates the bucketId on the hash ring for the input data. + * Uses MurmurHash3_32 and the modulo operation to constrain the result to the + * discrete range [0, bucketNum - 1]. + * * @param byteString The input data to be hashed + * + * @return The calculated bucketId + */ + public static int getBucketIdFromByteBuffer(ByteString byteString) + { + // Get the singleton instance to access bucketNum + BucketCache cacheInstance = getInstance(); + + // 1. Calculate the hash using MurmurHash3_32 + int hash = Hashing.murmur3_32_fixed() + .hashBytes(byteString.toByteArray()) + .asInt(); + + // 2. Take the absolute value (MurmurHash3_32 can return negative integers) + int absHash = Math.abs(hash); + + // 3. Apply modulo operation to compress the hash value to the range [0, bucketNum - 1] + return absHash % cacheInstance.bucketNum; + } + + /** + * Core lookup method: Retrieves the corresponding RetinaNodeInfo for a given bucketId. + * Uses a cache-aside strategy (lazy loading) to populate the cache upon miss. + * * @param bucketId The hash bucket ID of the data (range 0 to bucketNum - 1) + * + * @return The corresponding RetinaNodeInfo, or null if lookup fails + */ + public NodeProto.NodeInfo getRetinaNodeInfoByBucketId(int bucketId) + { + // 1. Try to get from cache + NodeProto.NodeInfo nodeInfo = bucketToNodeMap.get(bucketId); + if (nodeInfo != null) + { + return nodeInfo; + } + + // 2. Cache miss: Fetch from the authoritative source (NodeService RPC) + NodeProto.NodeInfo fetchedNodeInfo = fetchNodeInfoFromNodeService(bucketId); + + if (fetchedNodeInfo != null) + { + // 3. Put into cache + bucketToNodeMap.put(bucketId, fetchedNodeInfo); + return fetchedNodeInfo; + } + + return null; + } + + private NodeProto.NodeInfo fetchNodeInfoFromNodeService(int bucketId) + { + return nodeService.getRetinaByBucket(bucketId); + } +} diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java index 7f2a6db759..8753d7e437 100644 --- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java +++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java @@ -89,4 +89,6 @@ public final class Constants public static final String AI_ROW_ID_PREFIX = "row_id_"; public static final String CF_OUTPUT_STATE_KEY_PREFIX = "pixels_turbo_cf_output"; + + public static final String LOAD_DEFAULT_RETINA_PREFIX = "default_retina_"; } diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties index b4f42cbc52..e0aa82da43 100644 --- a/pixels-common/src/main/resources/pixels.properties +++ b/pixels-common/src/main/resources/pixels.properties @@ -73,7 +73,7 @@ node.server.port=18891 node.server.host=localhost # number of virtual nodes per physical node (used in consistent hashing) node.virtual.num=16 -node.bucket.num=1024 +node.bucket.num=128 ###### storage engine settings ###### diff --git a/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestBucketCache.java b/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestBucketCache.java new file mode 100644 index 0000000000..d624741f8a --- /dev/null +++ b/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestBucketCache.java @@ -0,0 +1,68 @@ + /* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ + + package io.pixelsdb.pixels.common.node; + +import io.pixelsdb.pixels.common.utils.ConfigFactory; +import io.pixelsdb.pixels.daemon.NodeProto; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + + public class TestBucketCache +{ + @Test + public void testBucketCache() + { + BucketCache bucketCache = BucketCache.getInstance(); + NodeProto.NodeInfo nodeInfo0 = bucketCache.getRetinaNodeInfoByBucketId(0); + Assertions.assertNotNull(nodeInfo0); + NodeProto.NodeInfo nodeInfo1 = bucketCache.getRetinaNodeInfoByBucketId(1); + Assertions.assertNotNull(nodeInfo1); + NodeProto.NodeInfo nodeInfo0_cache = bucketCache.getRetinaNodeInfoByBucketId(0); + Assertions.assertEquals(nodeInfo0, nodeInfo0_cache); + } + + @Test + public void testBucketStats() + { + int bucketNum = Integer.parseInt( + ConfigFactory.Instance().getProperty("node.bucket.num") + ); + + BucketCache bucketCache = BucketCache.getInstance(); + Map bucketStats = new HashMap<>(); + + for (int i = 0; i < bucketNum; ++i) + { + NodeProto.NodeInfo node = bucketCache.getRetinaNodeInfoByBucketId(i); + String addr = node.getAddress(); + + bucketStats.compute(addr, (k, v) -> v == null ? 1 : v + 1); + } + + for (Map.Entry entry : bucketStats.entrySet()) { + System.out.println(entry.getKey() + " => " + entry.getValue()); + } + } + +} diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java index 09e3dd96f3..1e3eb125f4 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/metadata/MetadataServiceImpl.java @@ -24,12 +24,14 @@ import com.google.protobuf.ProtocolStringList; import io.grpc.stub.StreamObserver; import io.pixelsdb.pixels.common.metadata.domain.*; +import io.pixelsdb.pixels.common.node.NodeService; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.retina.RetinaService; import io.pixelsdb.pixels.common.utils.ConfigFactory; import io.pixelsdb.pixels.core.TypeDescription; import io.pixelsdb.pixels.daemon.MetadataProto; import io.pixelsdb.pixels.daemon.MetadataServiceGrpc; +import io.pixelsdb.pixels.daemon.NodeProto; import io.pixelsdb.pixels.daemon.metadata.dao.*; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -354,15 +356,22 @@ public void createTable(MetadataProto.CreateTableRequest request, if (allSuccess) { // Issue #930: corresponding writerBuffer was not created when creating table - RetinaService retinaService = RetinaService.Instance(); - if (!retinaService.addWriterBuffer(request.getSchemaName(), request.getTableName())) + // Issue #1218: support multi-retina + NodeService nodeService = NodeService.Instance(); + List retinaList = nodeService.getRetinaList(); + int retinaPort = Integer.parseInt(ConfigFactory.Instance().getProperty("retina.server.port")); + for(NodeProto.NodeInfo retinaNode : retinaList) { - headerBuilder.setErrorCode(METADATA_ADD_RETINA_BUFFER_FAILED) - .setErrorMsg("failed to add retina's writer buffer for table '" + - request.getSchemaName() + "." + request.getTableName() + "'"); - } else - { - headerBuilder.setErrorCode(SUCCESS).setErrorMsg(""); + RetinaService retinaService = RetinaService.CreateInstance(retinaNode.getAddress(), retinaPort); + if (!retinaService.addWriterBuffer(request.getSchemaName(), request.getTableName())) + { + headerBuilder.setErrorCode(METADATA_ADD_RETINA_BUFFER_FAILED) + .setErrorMsg("failed to add retina's writer buffer for table '" + + request.getSchemaName() + "." + request.getTableName() + "'"); + } else + { + headerBuilder.setErrorCode(SUCCESS).setErrorMsg(""); + } } } else { diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java index b833bdc1e9..c077b6053a 100644 --- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java +++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java @@ -154,18 +154,18 @@ private void updateHashRing(List currentNodes) // 2. Collect addresses currently on the ring Set existingAddresses = new HashSet<>(); - for (NodeProto.NodeInfo node : new TreeSet<>(hashRing.values())) + for (NodeProto.NodeInfo node : new HashSet<>(hashRing.values())) { existingAddresses.add(node.getAddress()); } // 3. Remove nodes not present anymore (existingAddresses - newAddresses) - for (NodeProto.NodeInfo node : new TreeSet<>(hashRing.values())) + for (String address : existingAddresses) { - if (!newAddresses.contains(node.getAddress())) + if (!newAddresses.contains(address)) { - removeNodeInternal(node); - logger.warn("Removed node from hash ring: " + node.getAddress()); + removeAddressInternal(address); + logger.warn("Removed node from hash ring: " + address); } } @@ -201,19 +201,26 @@ private void addNodeInternal(NodeProto.NodeInfo node) } /** - * Remove a node from the hash ring internally. + * Remove address from the hash ring internally. */ - private void removeNodeInternal(NodeProto.NodeInfo node) + private void removeAddressInternal(String address) { // Recalculate and remove all virtual nodes for (int i = 0; i < bucketNum; i++) { - int hashPoint = hash(node.getAddress() + "#" + i) % bucketNum; + int hashPoint = hash(address + "#" + i) % bucketNum; hashRing.remove(hashPoint); } // Note: The removal above is technically incomplete if multiple virtual nodes map to the same hashPoint, // but given the requirement, we assume hash() provides a reasonably uniform spread. } + /** + * Remove a node from the hash ring internally. + */ + private void removeNodeInternal(NodeProto.NodeInfo node) + { + removeAddressInternal(node.getAddress()); + } /** * Simple hash function for consistent hashing. diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java index 72c0297c02..b8214fcdba 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java @@ -69,13 +69,13 @@ public FileWriterManager(long tableId, TypeDescription schema, Path targetOrderedDirPath, Storage targetOrderedStorage, int pixelsStride, long blockSize, short replication, EncodingLevel encodingLevel, boolean nullsPadding, - long firstBlockId, int recordNum) throws RetinaException + long firstBlockId, int recordNum, String hostName) throws RetinaException { this.tableId = tableId; this.firstBlockId = firstBlockId; // Create pixels writer. - String targetFileName = DateUtil.getCurTime() + ".pxl"; + String targetFileName = hostName + "_" + DateUtil.getCurTime() + ".pxl"; String targetFilePath = targetOrderedDirPath.getUri() + "/" + targetFileName; try { @@ -90,7 +90,8 @@ public FileWriterManager(long tableId, TypeDescription schema, this.file.setId(metadataService.getFileId(targetFilePath)); } catch (MetadataException e) { - throw new RetinaException("Failed to add file information to the metadata", e); + throw new RetinaException("Failed to add file information to the metadata, " + + "targetFilePath: " + targetFilePath, e); } // Add the corresponding visibility for the file. diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/ObjectStorageManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/ObjectStorageManager.java index 0d2b4bec11..4c58a4499a 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/ObjectStorageManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/ObjectStorageManager.java @@ -34,6 +34,7 @@ public class ObjectStorageManager private static volatile ObjectStorageManager instance; private final Storage storage; private final String path; + private String idPrefix; private ObjectStorageManager() throws RetinaException { @@ -82,7 +83,12 @@ public static ObjectStorageManager Instance() throws RetinaException private String buildKey(long tableId, long entryId) { - return this.path + String.format("%d/%d", tableId, entryId); + return this.path + String.format("%d/%s%d", tableId, idPrefix, entryId); + } + + public void setIdPrefix(String idPrefix) + { + this.idPrefix = idPrefix; } public void write(long tableId, long entryId, byte[] data) throws RetinaException diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java index f9cf073751..607cb41dc5 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java @@ -111,9 +111,10 @@ public class PixelsWriterBuffer private final List fileWriterManagers; private FileWriterManager currentFileWriterManager; private AtomicLong maxObjectKey; + private String retinaHostName; public PixelsWriterBuffer(long tableId, TypeDescription schema, Path targetOrderedDirPath, - Path targetCompactDirPath) throws RetinaException + Path targetCompactDirPath, String retinaHostName) throws RetinaException { this.tableId = tableId; this.schema = schema; @@ -146,13 +147,15 @@ public PixelsWriterBuffer(long tableId, TypeDescription schema, Path targetOrder this.fileWriterManagers = new ArrayList<>(); this.maxObjectKey = new AtomicLong(-1); + this.retinaHostName = retinaHostName; this.objectStorageManager = ObjectStorageManager.Instance(); + this.objectStorageManager.setIdPrefix(retinaHostName + "_"); this.currentFileWriterManager = new FileWriterManager( this.tableId, this.schema, this.targetOrderedDirPath, this.targetOrderedStorage, this.memTableSize, this.blockSize, this.replication, this.encodingLevel, this.nullsPadding, - idCounter, this.memTableSize * this.maxMemTableCount); + idCounter, this.memTableSize * this.maxMemTableCount, retinaHostName); this.activeMemTable = new MemTable(this.idCounter, schema, memTableSize, TypeDescription.Mode.CREATE_INT_VECTOR_FOR_INT, this.currentFileWriterManager.getFileId(), @@ -236,7 +239,7 @@ private void switchMemTable() throws RetinaException this.targetOrderedDirPath, this.targetOrderedStorage, this.memTableSize, this.blockSize, this.replication, this.encodingLevel, this.nullsPadding, this.idCounter, - this.memTableSize * this.maxMemTableCount); + this.memTableSize * this.maxMemTableCount, this.retinaHostName); } /* diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java index 2faeacfb36..f5b1dd5d0e 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/RetinaResourceManager.java @@ -38,6 +38,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.*; @@ -56,6 +58,7 @@ public class RetinaResourceManager private final MetadataService metadataService; private final Map rgVisibilityMap; private final Map pixelsWriterBufferMap; + private String retinaHostName; // GC related fields private final ScheduledExecutorService gcExecutor; @@ -89,6 +92,19 @@ private RetinaResourceManager() logger.error("Failed to start retina background gc", e); } this.gcExecutor = executor; + + this.retinaHostName = System.getenv("HOSTNAME"); + if (retinaHostName == null) + { + try + { + this.retinaHostName = InetAddress.getLocalHost().getHostName(); + logger.debug("HostName from InetAddress: {}", retinaHostName); + } catch (UnknownHostException e) + { + logger.error("Failed to get retina hostname", e); + } + } } private static final class InstanceHolder @@ -184,7 +200,7 @@ public void addWriterBuffer(String schemaName, String tableName) throws RetinaEx TypeDescription schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); PixelsWriterBuffer pixelsWriterBuffer = new PixelsWriterBuffer(latestLayout.getTableId(), - schema, orderedPaths.get(0), compactPaths.get(0)); + schema, orderedPaths.get(0), compactPaths.get(0), retinaHostName); String writerBufferKey = schemaName + "_" + tableName; pixelsWriterBufferMap.put(writerBufferKey, pixelsWriterBuffer); } catch (Exception e) diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java index bb4c095ede..1231eab213 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java @@ -58,7 +58,7 @@ public void setup() columnTypes.add("int"); schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); - buffer = new PixelsWriterBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath); // table id get from mysql `TBLS` table + buffer = new PixelsWriterBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath, "localhost"); // table id get from mysql `TBLS` table } catch (Exception e) { System.out.println("setup error: " + e); diff --git a/scripts/sbin/start-retina.sh b/scripts/sbin/start-retina.sh index 48ddbb033f..b8ba241b65 100755 --- a/scripts/sbin/start-retina.sh +++ b/scripts/sbin/start-retina.sh @@ -5,11 +5,13 @@ if [ -z "$PIXELS_HOME" ]; then exit 1 fi -# Remember to set `hostnames` and `pixels_home[Optional]` in config file `$PIXELS_HOME/etc/retina` +# Remember to set `hostnames` and `pixels_home[Optional]` in config file `$PIXELS_HOME/etc/workers` DEFAULT_PIXELS_HOME=$PIXELS_HOME -read -r retina home < $PIXELS_HOME/etc/retina -home="${home:-${DEFAULT_PIXELS_HOME}}" -REMOTE_SCRIPT="export PIXELS_HOME=${home} && $PIXELS_HOME/bin/start-daemon.sh retina -daemon" -echo "Starting retina on ${retina}..." -ssh -n "${retina}" "${REMOTE_SCRIPT}" +while read -r retina home +do + home="${home:-${DEFAULT_PIXELS_HOME}}" + REMOTE_SCRIPT="export PIXELS_HOME=${home} && $PIXELS_HOME/bin/start-daemon.sh retina -daemon" + echo "Starting retina on ${retina}..." + ssh -n "${retina}" "${REMOTE_SCRIPT}" +done < $PIXELS_HOME/etc/retina diff --git a/scripts/sbin/stop-retina.sh b/scripts/sbin/stop-retina.sh index aa36fb6ecb..98a8f2fc39 100755 --- a/scripts/sbin/stop-retina.sh +++ b/scripts/sbin/stop-retina.sh @@ -5,11 +5,13 @@ if [ -z "$PIXELS_HOME" ]; then exit 1 fi -# Remember to set `hostnames` and `pixels_home[Optional]` in config file `$PIXELS_HOME/etc/retina` +# Remember to set `hostnames` and `pixels_home[Optional]` in config file `$PIXELS_HOME/etc/workers` DEFAULT_PIXELS_HOME=$PIXELS_HOME -read -r retina home < $PIXELS_HOME/etc/retina -home="${home:-${DEFAULT_PIXELS_HOME}}" -REMOTE_SCRIPT="export PIXELS_HOME=${home} && $PIXELS_HOME/bin/stop-daemon.sh retina -daemon" -echo "Stop retina on ${retina}." -ssh -n "${retina}" "${REMOTE_SCRIPT}" +while read -r retina home +do + home="${home:-${DEFAULT_PIXELS_HOME}}" + REMOTE_SCRIPT="export PIXELS_HOME=${home} && $PIXELS_HOME/bin/stop-daemon.sh retina -daemon" + echo "Stop retina on ${retina}." + ssh -n "${retina}" "${REMOTE_SCRIPT}" +done < $PIXELS_HOME/etc/retina