diff --git a/LICENSE b/LICENSE index 1fc316248ac3..7c5f46ffe4c2 100644 --- a/LICENSE +++ b/LICENSE @@ -213,8 +213,6 @@ paimon-common/src/main/java/org/apache/paimon/fs/FileRange.java paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java from http://hadoop.apache.org/ version 2.10.2 -paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java -paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java paimon-common/src/main/java/org/apache/paimon/utils/VarLengthIntUtils.java from https://github.com/linkedin/PalDB version 1.2.0 diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 7f7bb6816d92..0b7fcca75f9c 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -693,12 +693,6 @@ Float The index load factor for lookup. - -
lookup.local-file-type
- sort -

Enum

- The local file type for lookup.

Possible values: -
lookup.merge-buffer-size
8 mb diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 86800170275c..c90760183de1 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1121,12 +1121,6 @@ public InlineElement getDescription() { .withDescription( "Define partition by table options, cannot define partition on DDL and table options at the same time."); - public static final ConfigOption LOOKUP_LOCAL_FILE_TYPE = - key("lookup.local-file-type") - .enumType(LookupLocalFileType.class) - .defaultValue(LookupLocalFileType.SORT) - .withDescription("The local file type for lookup."); - public static final ConfigOption LOOKUP_HASH_LOAD_FACTOR = key("lookup.hash-load-factor") .floatType() @@ -2465,10 +2459,6 @@ public int cachePageSize() { return (int) options.get(CACHE_PAGE_SIZE).getBytes(); } - public LookupLocalFileType lookupLocalFileType() { - return options.get(LOOKUP_LOCAL_FILE_TYPE); - } - public MemorySize lookupCacheMaxMemory() { return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE); } @@ -3815,32 +3805,6 @@ public InlineElement getDescription() { } } - /** Specifies the local file type for lookup. */ - public enum LookupLocalFileType implements DescribedEnum { - SORT("sort", "Construct a sorted file for lookup."), - - HASH("hash", "Construct a hash file for lookup."); - - private final String value; - - private final String description; - - LookupLocalFileType(String value, String description) { - this.value = value; - this.description = description; - } - - @Override - public String toString() { - return value; - } - - @Override - public InlineElement getDescription() { - return text(description); - } - } - /** The time unit of materialized table freshness. */ public enum MaterializedTableIntervalFreshnessTimeUnit { SECOND, diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java index 653bfee6cc00..e7b5412719d4 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/AbstractLookupBenchmark.java @@ -28,7 +28,6 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BloomFilter; -import org.apache.paimon.utils.Pair; import java.io.File; import java.io.IOException; @@ -84,7 +83,7 @@ protected byte[] intToByteArray(int value) { return keySerializer.serializeToBytes(reusedKey); } - protected Pair writeData( + protected String writeData( Path tempDir, CoreOptions options, byte[][] inputs, @@ -102,9 +101,7 @@ protected Pair writeData( new CacheManager(MemorySize.ofMebiBytes(10)), keySerializer.createSliceComparator()); - String name = - String.format( - "%s-%s-%s", options.lookupLocalFileType(), valueLength, bloomFilterEnabled); + String name = String.format("%s-%s", valueLength, bloomFilterEnabled); File file = new File(tempDir.toFile(), UUID.randomUUID() + "-" + name); LookupStoreWriter writer = factory.createWriter(file, createBloomFiler(bloomFilterEnabled)); int i = 0; @@ -120,8 +117,8 @@ protected Pair writeData( i = (i + 1) % 2; } } - LookupStoreFactory.Context context = writer.close(); - return Pair.of(file.getAbsolutePath(), context); + writer.close(); + return file.getAbsolutePath(); } private BloomFilter.Builder createBloomFiler(boolean enabled) { diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java deleted file mode 100644 index c15765d41426..000000000000 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupBloomFilterBenchmark.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.benchmark.lookup; - -import org.apache.paimon.benchmark.Benchmark; -import org.apache.paimon.compression.CompressOptions; -import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.hash.HashLookupStoreFactory; -import org.apache.paimon.lookup.hash.HashLookupStoreReader; -import org.apache.paimon.lookup.hash.HashLookupStoreWriter; -import org.apache.paimon.options.MemorySize; -import org.apache.paimon.utils.BloomFilter; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.io.IOException; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.UUID; - -/** Benchmark for measure the bloom filter for lookup. */ -public class LookupBloomFilterBenchmark extends AbstractLookupBenchmark { - - @TempDir Path tempDir; - - @Test - public void testHighMatch() throws Exception { - innerTest("lookup", generateSequenceInputs(0, 100000), generateRandomInputs(0, 100000)); - } - - @Test - public void testHalfMatch() throws Exception { - innerTest("lookup", generateSequenceInputs(0, 100000), generateRandomInputs(50000, 150000)); - } - - @Test - public void testLowMatch() throws Exception { - innerTest( - "lookup", generateSequenceInputs(0, 100000), generateRandomInputs(100000, 200000)); - } - - public void innerTest(String name, byte[][] inputs, byte[][] probe) throws Exception { - Benchmark benchmark = - new Benchmark(name, probe.length).setNumWarmupIters(1).setOutputPerIteration(true); - - for (int valueLength : VALUE_LENGTHS) { - HashLookupStoreReader reader = writeData(null, inputs, valueLength); - - benchmark.addCase( - String.format("bf-disabled-%dB-value", valueLength), - 5, - () -> { - try { - for (byte[] key : probe) { - reader.lookup(key); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - - HashLookupStoreReader reader2 = - writeData(BloomFilter.builder(inputs.length, 0.05), inputs, valueLength); - - benchmark.addCase( - String.format("bf-enabled-%dB-value", valueLength), - 5, - () -> { - try { - for (byte[] key : probe) { - reader2.lookup(key); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - }); - } - - benchmark.run(); - } - - private HashLookupStoreReader writeData( - BloomFilter.Builder filter, byte[][] inputs, int valueLength) throws IOException { - byte[] value = new byte[valueLength]; - Arrays.fill(value, (byte) 1); - HashLookupStoreFactory factory = - new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(10)), - 16 * 1024, - 0.75, - new CompressOptions("none", 1)); - - File file = new File(tempDir.toFile(), UUID.randomUUID().toString()); - HashLookupStoreWriter writer = factory.createWriter(file, filter); - for (byte[] input : inputs) { - writer.put(input, value); - } - return factory.createReader(file, writer.close()); - } -} diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java index 2d8de84327d4..811ec7c984d4 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupReaderBenchmark.java @@ -29,7 +29,6 @@ import org.apache.paimon.testutils.junit.parameterized.Parameters; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.Pair; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -38,10 +37,9 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; import static org.assertj.core.api.Assertions.assertThat; /** Benchmark for measuring the throughput of writing for lookup. */ @@ -87,43 +85,26 @@ private void readLookupDataBenchmark(byte[][] inputs, byte[][] randomInputs, boo .setNumWarmupIters(1) .setOutputPerIteration(true); for (int valueLength : VALUE_LENGTHS) { - for (CoreOptions.LookupLocalFileType fileType : - CoreOptions.LookupLocalFileType.values()) { - CoreOptions options = - CoreOptions.fromMap( - Collections.singletonMap( - LOOKUP_LOCAL_FILE_TYPE.key(), fileType.name())); - Pair pair = - writeData(tempDir, options, inputs, valueLength, false, bloomFilterEnabled); - benchmark.addCase( - String.format( - "%s-read-%dB-value-%d-num", - fileType.name(), valueLength, randomInputs.length), - 5, - () -> { - try { - readData( - options, - randomInputs, - pair.getLeft(), - pair.getRight(), - nullResult); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } + CoreOptions options = new CoreOptions(new HashMap<>()); + String path = + writeData(tempDir, options, inputs, valueLength, false, bloomFilterEnabled); + benchmark.addCase( + String.format("read-%dB-value-%d-num", valueLength, randomInputs.length), + 5, + () -> { + try { + readData(options, randomInputs, path, nullResult); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } benchmark.run(); } private void readData( - CoreOptions options, - byte[][] randomInputs, - String filePath, - LookupStoreFactory.Context context, - boolean nullResult) + CoreOptions options, byte[][] randomInputs, String filePath, boolean nullResult) throws IOException { LookupStoreFactory factory = LookupStoreFactory.create( @@ -133,7 +114,7 @@ private void readData( .createSliceComparator()); File file = new File(filePath); - LookupStoreReader reader = factory.createReader(file, context); + LookupStoreReader reader = factory.createReader(file); for (byte[] input : randomInputs) { if (nullResult) { assertThat(reader.lookup(input)).isNull(); diff --git a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java index a590866ef175..84a1bfca1ca5 100644 --- a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java +++ b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/lookup/LookupWriterBenchmark.java @@ -29,11 +29,9 @@ import java.io.IOException; import java.nio.file.Path; -import java.util.Collections; +import java.util.HashMap; import java.util.List; -import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; - /** Benchmark for measuring the throughput of writing for lookup. */ @ExtendWith(ParameterizedTestExtension.class) public class LookupWriterBenchmark extends AbstractLookupBenchmark { @@ -67,31 +65,23 @@ private void writeLookupDataBenchmark(byte[][] inputs, boolean sameValue) { .setNumWarmupIters(1) .setOutputPerIteration(true); for (int valueLength : VALUE_LENGTHS) { - for (CoreOptions.LookupLocalFileType fileType : - CoreOptions.LookupLocalFileType.values()) { - CoreOptions options = - CoreOptions.fromMap( - Collections.singletonMap( - LOOKUP_LOCAL_FILE_TYPE.key(), fileType.name())); - benchmark.addCase( - String.format( - "%s-write-%dB-value-%d-num", - fileType.name(), valueLength, inputs.length), - 5, - () -> { - try { - writeData( - tempDir, - options, - inputs, - valueLength, - sameValue, - bloomFilterEnabled); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } + CoreOptions options = new CoreOptions(new HashMap<>()); + benchmark.addCase( + String.format("write-%dB-value-%d-num", valueLength, inputs.length), + 5, + () -> { + try { + writeData( + tempDir, + options, + inputs, + valueLength, + sameValue, + bloomFilterEnabled); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); } benchmark.run(); diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java index adee859162c0..c6861865e5a8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.hash.HashLookupStoreFactory; import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.options.Options; @@ -48,7 +47,7 @@ public interface LookupStoreFactory { LookupStoreWriter createWriter(File file, @Nullable BloomFilter.Builder bloomFilter) throws IOException; - LookupStoreReader createReader(File file, Context context) throws IOException; + LookupStoreReader createReader(File file) throws IOException; static Function bfGenerator(Options options) { Function bfGenerator = rowCount -> null; @@ -68,20 +67,8 @@ static Function bfGenerator(Options options) { static LookupStoreFactory create( CoreOptions options, CacheManager cacheManager, Comparator keyComparator) { CompressOptions compression = options.lookupCompressOptions(); - switch (options.lookupLocalFileType()) { - case SORT: - return new SortLookupStoreFactory( - keyComparator, cacheManager, options.cachePageSize(), compression); - case HASH: - return new HashLookupStoreFactory( - cacheManager, - options.cachePageSize(), - options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR), - compression); - default: - throw new IllegalArgumentException( - "Unsupported lookup local file type: " + options.lookupLocalFileType()); - } + return new SortLookupStoreFactory( + keyComparator, cacheManager, options.cachePageSize(), compression); } /** Context between writer and reader. */ diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreWriter.java index 60271ce40f6b..607681204d56 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreWriter.java @@ -18,15 +18,12 @@ package org.apache.paimon.lookup; -import org.apache.paimon.lookup.LookupStoreFactory.Context; - +import java.io.Closeable; import java.io.IOException; /** Writer to prepare binary file. */ -public interface LookupStoreWriter { +public interface LookupStoreWriter extends Closeable { /** Put key value to store. */ void put(byte[] key, byte[] value) throws IOException; - - Context close() throws IOException; } diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java deleted file mode 100644 index 3741fc6fd3e4..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashContext.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.hash; - -import org.apache.paimon.lookup.LookupStoreFactory.Context; - -/** Context for {@link HashLookupStoreFactory}. */ -public class HashContext implements Context { - - // is bloom filter enabled - final boolean bloomFilterEnabled; - // expected entries for bloom filter - final long bloomFilterExpectedEntries; - // bytes for bloom filter - final int bloomFilterBytes; - - // Key count for each key length - final int[] keyCounts; - // Slot size for each key length - final int[] slotSizes; - // Number of slots for each key length - final int[] slots; - // Offset of the index for different key length - final int[] indexOffsets; - // Offset of the data for different key length - final long[] dataOffsets; - - final long uncompressBytes; - final long[] compressPages; - - public HashContext( - boolean bloomFilterEnabled, - long bloomFilterExpectedEntries, - int bloomFilterBytes, - int[] keyCounts, - int[] slotSizes, - int[] slots, - int[] indexOffsets, - long[] dataOffsets, - long uncompressBytes, - long[] compressPages) { - this.bloomFilterEnabled = bloomFilterEnabled; - this.bloomFilterExpectedEntries = bloomFilterExpectedEntries; - this.bloomFilterBytes = bloomFilterBytes; - this.keyCounts = keyCounts; - this.slotSizes = slotSizes; - this.slots = slots; - this.indexOffsets = indexOffsets; - this.dataOffsets = dataOffsets; - this.uncompressBytes = uncompressBytes; - this.compressPages = compressPages; - } - - public HashContext copy(long uncompressBytes, long[] compressPages) { - return new HashContext( - bloomFilterEnabled, - bloomFilterExpectedEntries, - bloomFilterBytes, - keyCounts, - slotSizes, - slots, - indexOffsets, - dataOffsets, - uncompressBytes, - compressPages); - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java deleted file mode 100644 index 1dbf715a1e65..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreFactory.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.hash; - -import org.apache.paimon.compression.BlockCompressionFactory; -import org.apache.paimon.compression.CompressOptions; -import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.LookupStoreFactory; -import org.apache.paimon.utils.BloomFilter; - -import javax.annotation.Nullable; - -import java.io.File; -import java.io.IOException; - -/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */ -public class HashLookupStoreFactory implements LookupStoreFactory { - - private final CacheManager cacheManager; - private final int cachePageSize; - private final double loadFactor; - @Nullable private final BlockCompressionFactory compressionFactory; - - public HashLookupStoreFactory( - CacheManager cacheManager, - int cachePageSize, - double loadFactor, - CompressOptions compression) { - this.cacheManager = cacheManager; - this.cachePageSize = cachePageSize; - this.loadFactor = loadFactor; - this.compressionFactory = BlockCompressionFactory.create(compression); - } - - @Override - public HashLookupStoreReader createReader(File file, Context context) throws IOException { - return new HashLookupStoreReader( - file, (HashContext) context, cacheManager, cachePageSize, compressionFactory); - } - - @Override - public HashLookupStoreWriter createWriter(File file, @Nullable BloomFilter.Builder bloomFilter) - throws IOException { - return new HashLookupStoreWriter( - loadFactor, file, bloomFilter, compressionFactory, cachePageSize); - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java deleted file mode 100644 index 742cc1a60bbe..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java +++ /dev/null @@ -1,285 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.hash; - -import org.apache.paimon.compression.BlockCompressionFactory; -import org.apache.paimon.io.PageFileInput; -import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.io.cache.FileBasedRandomInputView; -import org.apache.paimon.lookup.LookupStoreReader; -import org.apache.paimon.utils.FileBasedBloomFilter; -import org.apache.paimon.utils.MurmurHashUtils; -import org.apache.paimon.utils.VarLengthIntUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.Iterator; -import java.util.Map; - -/* This file is based on source code of StorageReader from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. */ - -/** Internal read implementation for hash kv store. */ -public class HashLookupStoreReader - implements LookupStoreReader, Iterable> { - - private static final Logger LOG = - LoggerFactory.getLogger(HashLookupStoreReader.class.getName()); - - // Key count for each key length - private final int[] keyCounts; - // Slot size for each key length - private final int[] slotSizes; - // Number of slots for each key length - private final int[] slots; - // Offset of the index for different key length - private final int[] indexOffsets; - // Offset of the data for different key length - private final long[] dataOffsets; - // File input view - private FileBasedRandomInputView inputView; - // Buffers - private final byte[] slotBuffer; - - @Nullable private FileBasedBloomFilter bloomFilter; - - HashLookupStoreReader( - File file, - HashContext context, - CacheManager cacheManager, - int cachePageSize, - @Nullable BlockCompressionFactory compressionFactory) - throws IOException { - // File path - if (!file.exists()) { - throw new FileNotFoundException("File " + file.getAbsolutePath() + " not found"); - } - - keyCounts = context.keyCounts; - slots = context.slots; - slotSizes = context.slotSizes; - int maxSlotSize = 0; - for (int slotSize : slotSizes) { - maxSlotSize = Math.max(maxSlotSize, slotSize); - } - slotBuffer = new byte[maxSlotSize]; - indexOffsets = context.indexOffsets; - dataOffsets = context.dataOffsets; - - LOG.info("Opening file {}", file.getName()); - - PageFileInput fileInput = - PageFileInput.create( - file, - cachePageSize, - compressionFactory, - context.uncompressBytes, - context.compressPages); - inputView = new FileBasedRandomInputView(fileInput, cacheManager); - - if (context.bloomFilterEnabled) { - bloomFilter = - new FileBasedBloomFilter( - fileInput, - cacheManager, - context.bloomFilterExpectedEntries, - 0, - context.bloomFilterBytes); - } - } - - @Override - public byte[] lookup(byte[] key) throws IOException { - int keyLength = key.length; - if (keyLength >= slots.length || keyCounts[keyLength] == 0) { - return null; - } - - int hashcode = MurmurHashUtils.hashBytes(key); - if (bloomFilter != null && !bloomFilter.testHash(hashcode)) { - return null; - } - - long hashPositive = hashcode & 0x7fffffff; - int numSlots = slots[keyLength]; - int slotSize = slotSizes[keyLength]; - int indexOffset = indexOffsets[keyLength]; - long dataOffset = dataOffsets[keyLength]; - - for (int probe = 0; probe < numSlots; probe++) { - long slot = (hashPositive + probe) % numSlots; - inputView.setReadPosition(indexOffset + slot * slotSize); - inputView.readFully(slotBuffer, 0, slotSize); - - long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength); - if (offset == 0) { - return null; - } - if (isKey(slotBuffer, key)) { - return getValue(dataOffset + offset); - } - } - return null; - } - - private boolean isKey(byte[] slotBuffer, byte[] key) { - for (int i = 0; i < key.length; i++) { - if (slotBuffer[i] != key[i]) { - return false; - } - } - return true; - } - - private byte[] getValue(long offset) throws IOException { - inputView.setReadPosition(offset); - - // Get size of data - int size = VarLengthIntUtils.decodeInt(inputView); - - // Create output bytes - byte[] res = new byte[size]; - inputView.readFully(res); - return res; - } - - @Override - public void close() throws IOException { - if (bloomFilter != null) { - bloomFilter.close(); - } - inputView.close(); - inputView = null; - } - - @Override - public Iterator> iterator() { - return new StorageIterator(true); - } - - public Iterator> keys() { - return new StorageIterator(false); - } - - private class StorageIterator implements Iterator> { - - private final FastEntry entry = new FastEntry(); - private final boolean withValue; - private int currentKeyLength = 0; - private byte[] currentSlotBuffer; - private long keyIndex; - private long keyLimit; - private long currentDataOffset; - private int currentIndexOffset; - - public StorageIterator(boolean value) { - withValue = value; - nextKeyLength(); - } - - private void nextKeyLength() { - for (int i = currentKeyLength + 1; i < keyCounts.length; i++) { - long c = keyCounts[i]; - if (c > 0) { - currentKeyLength = i; - keyLimit += c; - currentSlotBuffer = new byte[slotSizes[i]]; - currentIndexOffset = indexOffsets[i]; - currentDataOffset = dataOffsets[i]; - break; - } - } - } - - @Override - public boolean hasNext() { - return keyIndex < keyLimit; - } - - @Override - public FastEntry next() { - try { - inputView.setReadPosition(currentIndexOffset); - - long offset = 0; - while (offset == 0) { - inputView.readFully(currentSlotBuffer); - offset = VarLengthIntUtils.decodeLong(currentSlotBuffer, currentKeyLength); - currentIndexOffset += currentSlotBuffer.length; - } - - byte[] key = Arrays.copyOf(currentSlotBuffer, currentKeyLength); - byte[] value = null; - - if (withValue) { - long valueOffset = currentDataOffset + offset; - value = getValue(valueOffset); - } - - entry.set(key, value); - - if (++keyIndex == keyLimit) { - nextKeyLength(); - } - return entry; - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Not supported yet."); - } - - private class FastEntry implements Map.Entry { - - private byte[] key; - private byte[] val; - - protected void set(byte[] k, byte[] v) { - this.key = k; - this.val = v; - } - - @Override - public byte[] getKey() { - return key; - } - - @Override - public byte[] getValue() { - return val; - } - - @Override - public byte[] setValue(byte[] value) { - throw new UnsupportedOperationException("Not supported."); - } - } - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java deleted file mode 100644 index 3307377a2706..000000000000 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java +++ /dev/null @@ -1,498 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.hash; - -import org.apache.paimon.compression.BlockCompressionFactory; -import org.apache.paimon.io.CompressedPageFileOutput; -import org.apache.paimon.io.PageFileOutput; -import org.apache.paimon.lookup.LookupStoreFactory.Context; -import org.apache.paimon.lookup.LookupStoreWriter; -import org.apache.paimon.utils.BloomFilter; -import org.apache.paimon.utils.MurmurHashUtils; -import org.apache.paimon.utils.VarLengthIntUtils; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nullable; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; - -/* This file is based on source code of StorageWriter from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. */ - -/** Internal write implementation for hash kv store. */ -public class HashLookupStoreWriter implements LookupStoreWriter { - - private static final Logger LOG = - LoggerFactory.getLogger(HashLookupStoreWriter.class.getName()); - - // load factor of hash map, default 0.75 - private final double loadFactor; - // Output - private final File tempFolder; - private final File outputFile; - // Index stream - private File[] indexFiles; - private DataOutputStream[] indexStreams; - // Data stream - private File[] dataFiles; - private DataOutputStream[] dataStreams; - // Cache last value - private byte[][] lastValues; - private int[] lastValuesLength; - // Data length - private long[] dataLengths; - // Max offset length - private int[] maxOffsetLengths; - // Number of keys - private int keyCount; - private int[] keyCounts; - // Number of values - private int valueCount; - // Number of collisions - private int collisions; - - @Nullable private final BloomFilter.Builder bloomFilter; - - @Nullable private final BlockCompressionFactory compressionFactory; - private final int compressPageSize; - - HashLookupStoreWriter( - double loadFactor, - File file, - @Nullable BloomFilter.Builder bloomFilter, - @Nullable BlockCompressionFactory compressionFactory, - int compressPageSize) - throws IOException { - this.loadFactor = loadFactor; - this.outputFile = file; - this.compressionFactory = compressionFactory; - this.compressPageSize = compressPageSize; - if (loadFactor <= 0.0 || loadFactor >= 1.0) { - throw new IllegalArgumentException( - "Illegal load factor = " + loadFactor + ", should be between 0.0 and 1.0."); - } - - this.tempFolder = new File(file.getParentFile(), UUID.randomUUID().toString()); - if (!tempFolder.mkdir()) { - throw new IOException("Can not create temp folder: " + tempFolder); - } - this.indexStreams = new DataOutputStream[0]; - this.dataStreams = new DataOutputStream[0]; - this.indexFiles = new File[0]; - this.dataFiles = new File[0]; - this.lastValues = new byte[0][]; - this.lastValuesLength = new int[0]; - this.dataLengths = new long[0]; - this.maxOffsetLengths = new int[0]; - this.keyCounts = new int[0]; - this.bloomFilter = bloomFilter; - } - - @Override - public void put(byte[] key, byte[] value) throws IOException { - int keyLength = key.length; - - // Get the Output stream for that keyLength, each key length has its own file - DataOutputStream indexStream = getIndexStream(keyLength); - - // Write key - indexStream.write(key); - - // Check if the value is identical to the last inserted - byte[] lastValue = lastValues[keyLength]; - boolean sameValue = lastValue != null && Arrays.equals(value, lastValue); - - // Get data stream and length - long dataLength = dataLengths[keyLength]; - if (sameValue) { - dataLength -= lastValuesLength[keyLength]; - } - - // Write offset and record max offset length - int offsetLength = VarLengthIntUtils.encodeLong(indexStream, dataLength); - maxOffsetLengths[keyLength] = Math.max(offsetLength, maxOffsetLengths[keyLength]); - - // Write if data is not the same - if (!sameValue) { - // Get stream - DataOutputStream dataStream = getDataStream(keyLength); - - // Write size and value - int valueSize = VarLengthIntUtils.encodeInt(dataStream, value.length); - dataStream.write(value); - - // Update data length - dataLengths[keyLength] += valueSize + value.length; - - // Update last value - lastValues[keyLength] = value; - lastValuesLength[keyLength] = valueSize + value.length; - - valueCount++; - } - - keyCount++; - keyCounts[keyLength]++; - if (bloomFilter != null) { - bloomFilter.addHash(MurmurHashUtils.hashBytes(key)); - } - } - - @Override - public Context close() throws IOException { - // Close the data and index streams - for (DataOutputStream dos : dataStreams) { - if (dos != null) { - dos.close(); - } - } - for (DataOutputStream dos : indexStreams) { - if (dos != null) { - dos.close(); - } - } - - // Stats - LOG.info("Number of keys: {}", keyCount); - LOG.info("Number of values: {}", valueCount); - - // Prepare files to merge - List filesToMerge = new ArrayList<>(); - - int bloomFilterBytes = bloomFilter == null ? 0 : bloomFilter.getBuffer().size(); - HashContext context = - new HashContext( - bloomFilter != null, - bloomFilter == null ? 0 : bloomFilter.expectedEntries(), - bloomFilterBytes, - new int[keyCounts.length], - new int[keyCounts.length], - new int[keyCounts.length], - new int[keyCounts.length], - new long[keyCounts.length], - 0, - null); - - long indexesLength = bloomFilterBytes; - long datasLength = 0; - for (int i = 0; i < this.keyCounts.length; i++) { - if (this.keyCounts[i] > 0) { - // Write the key Count - context.keyCounts[i] = keyCounts[i]; - - // Write slot count - int slots = (int) Math.round(keyCounts[i] / loadFactor); - context.slots[i] = slots; - - // Write slot size - int offsetLength = maxOffsetLengths[i]; - context.slotSizes[i] = i + offsetLength; - - // Write index offset - context.indexOffsets[i] = (int) indexesLength; - - // Increment index length - indexesLength += (long) (i + offsetLength) * slots; - - // Write data length - context.dataOffsets[i] = datasLength; - - // Increment data length - datasLength += dataLengths[i]; - } - } - - // adjust data offsets - for (int i = 0; i < context.dataOffsets.length; i++) { - context.dataOffsets[i] = indexesLength + context.dataOffsets[i]; - } - - PageFileOutput output = - PageFileOutput.create(outputFile, compressPageSize, compressionFactory); - try { - // Write bloom filter file - if (bloomFilter != null) { - File bloomFilterFile = new File(tempFolder, "bloomfilter.dat"); - try (FileOutputStream bfOutputStream = new FileOutputStream(bloomFilterFile)) { - bfOutputStream.write(bloomFilter.getBuffer().getArray()); - LOG.info("Bloom filter size: {} bytes", bloomFilter.getBuffer().size()); - } - filesToMerge.add(bloomFilterFile); - } - - // Build index file - for (int i = 0; i < indexFiles.length; i++) { - if (indexFiles[i] != null) { - filesToMerge.add(buildIndex(i)); - } - } - - // Stats collisions - LOG.info("Number of collisions: {}", collisions); - - // Add data files - for (File dataFile : dataFiles) { - if (dataFile != null) { - filesToMerge.add(dataFile); - } - } - - // Merge and write to output - checkFreeDiskSpace(filesToMerge); - mergeFiles(filesToMerge, output); - } finally { - cleanup(filesToMerge); - output.close(); - } - - LOG.info( - "Compressed Total store size: {} Mb", - new DecimalFormat("#,##0.0").format(outputFile.length() / (1024 * 1024))); - - if (output instanceof CompressedPageFileOutput) { - CompressedPageFileOutput compressedOutput = (CompressedPageFileOutput) output; - context = context.copy(compressedOutput.uncompressBytes(), compressedOutput.pages()); - } - return context; - } - - private File buildIndex(int keyLength) throws IOException { - long count = keyCounts[keyLength]; - int slots = (int) Math.round(count / loadFactor); - int offsetLength = maxOffsetLengths[keyLength]; - int slotSize = keyLength + offsetLength; - - // Init index - File indexFile = new File(tempFolder, "index" + keyLength + ".dat"); - try (RandomAccessFile indexAccessFile = new RandomAccessFile(indexFile, "rw")) { - indexAccessFile.setLength((long) slots * slotSize); - FileChannel indexChannel = indexAccessFile.getChannel(); - MappedByteBuffer byteBuffer = - indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, indexAccessFile.length()); - - // Init reading stream - File tempIndexFile = indexFiles[keyLength]; - DataInputStream tempIndexStream = - new DataInputStream( - new BufferedInputStream(new FileInputStream(tempIndexFile))); - try { - byte[] keyBuffer = new byte[keyLength]; - byte[] slotBuffer = new byte[slotSize]; - byte[] offsetBuffer = new byte[offsetLength]; - - // Read all keys - for (int i = 0; i < count; i++) { - // Read key - tempIndexStream.readFully(keyBuffer); - - // Read offset - long offset = VarLengthIntUtils.decodeLong(tempIndexStream); - - // Hash - long hash = MurmurHashUtils.hashBytesPositive(keyBuffer); - - boolean collision = false; - for (int probe = 0; probe < count; probe++) { - int slot = (int) ((hash + probe) % slots); - byteBuffer.position(slot * slotSize); - byteBuffer.get(slotBuffer); - - long found = VarLengthIntUtils.decodeLong(slotBuffer, keyLength); - if (found == 0) { - // The spot is empty use it - byteBuffer.position(slot * slotSize); - byteBuffer.put(keyBuffer); - int pos = VarLengthIntUtils.encodeLong(offsetBuffer, offset); - byteBuffer.put(offsetBuffer, 0, pos); - break; - } else { - collision = true; - // Check for duplicates - if (Arrays.equals(keyBuffer, Arrays.copyOf(slotBuffer, keyLength))) { - throw new RuntimeException( - String.format( - "A duplicate key has been found for for key bytes %s", - Arrays.toString(keyBuffer))); - } - } - } - - if (collision) { - collisions++; - } - } - - String msg = - " Max offset length: " - + offsetLength - + " bytes" - + "\n Slot size: " - + slotSize - + " bytes"; - - LOG.info("Built index file {}\n" + msg, indexFile.getName()); - } finally { - // Close input - tempIndexStream.close(); - - // Close index and make sure resources are liberated - indexChannel.close(); - - // Delete temp index file - if (tempIndexFile.delete()) { - LOG.info("Temporary index file {} has been deleted", tempIndexFile.getName()); - } - } - } - - return indexFile; - } - - // Fail if the size of the expected store file exceed 2/3rd of the free disk space - private void checkFreeDiskSpace(List inputFiles) { - // Check for free space - long usableSpace = 0; - long totalSize = 0; - for (File f : inputFiles) { - if (f.exists()) { - totalSize += f.length(); - usableSpace = f.getUsableSpace(); - } - } - LOG.info( - "Total expected store size is {} Mb", - new DecimalFormat("#,##0.0").format(totalSize / (1024 * 1024))); - LOG.info( - "Usable free space on the system is {} Mb", - new DecimalFormat("#,##0.0").format(usableSpace / (1024 * 1024))); - if (totalSize / (double) usableSpace >= 0.66) { - throw new RuntimeException("Aborting because there isn' enough free disk space"); - } - } - - // Merge files to the provided fileChannel - private void mergeFiles(List inputFiles, PageFileOutput output) throws IOException { - long startTime = System.nanoTime(); - - // Merge files - for (File f : inputFiles) { - if (f.exists()) { - FileInputStream fileInputStream = new FileInputStream(f); - BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream); - try { - LOG.info("Merging {} size={}", f.getName(), f.length()); - - byte[] buffer = new byte[8192]; - int length; - while ((length = bufferedInputStream.read(buffer)) > 0) { - output.write(buffer, 0, length); - } - } finally { - bufferedInputStream.close(); - fileInputStream.close(); - } - } else { - LOG.info("Skip merging file {} because it doesn't exist", f.getName()); - } - } - - LOG.info("Time to merge {} s", ((System.nanoTime() - startTime) / 1000000000.0)); - } - - // Cleanup files - private void cleanup(List inputFiles) { - for (File f : inputFiles) { - if (f.exists()) { - if (f.delete()) { - LOG.info("Deleted temporary file {}", f.getName()); - } - } - } - if (tempFolder.delete()) { - LOG.info("Deleted temporary folder at {}", tempFolder.getAbsolutePath()); - } - } - - // Get the data stream for the specified keyLength, create it if needed - private DataOutputStream getDataStream(int keyLength) throws IOException { - // Resize array if necessary - if (dataStreams.length <= keyLength) { - dataStreams = Arrays.copyOf(dataStreams, keyLength + 1); - dataFiles = Arrays.copyOf(dataFiles, keyLength + 1); - } - - DataOutputStream dos = dataStreams[keyLength]; - if (dos == null) { - File file = new File(tempFolder, "data" + keyLength + ".dat"); - dataFiles[keyLength] = file; - - dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file))); - dataStreams[keyLength] = dos; - - // Write one byte so the zero offset is reserved - dos.writeByte(0); - } - return dos; - } - - // Get the index stream for the specified keyLength, create it if needed - private DataOutputStream getIndexStream(int keyLength) throws IOException { - // Resize array if necessary - if (indexStreams.length <= keyLength) { - indexStreams = Arrays.copyOf(indexStreams, keyLength + 1); - indexFiles = Arrays.copyOf(indexFiles, keyLength + 1); - keyCounts = Arrays.copyOf(keyCounts, keyLength + 1); - maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1); - lastValues = Arrays.copyOf(lastValues, keyLength + 1); - lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1); - dataLengths = Arrays.copyOf(dataLengths, keyLength + 1); - } - - // Get or create stream - DataOutputStream dos = indexStreams[keyLength]; - if (dos == null) { - File file = new File(tempFolder, "temp_index" + keyLength + ".dat"); - indexFiles[keyLength] = file; - - dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file))); - indexStreams[keyLength] = dos; - - dataLengths[keyLength]++; - } - return dos; - } -} diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java index 244d7f9dd7e4..7dcacc9ad150 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java @@ -51,9 +51,8 @@ public SortLookupStoreFactory( } @Override - public SortLookupStoreReader createReader(File file, Context context) throws IOException { - return new SortLookupStoreReader( - comparator, file, blockSize, (SortContext) context, cacheManager); + public SortLookupStoreReader createReader(File file) throws IOException { + return new SortLookupStoreReader(comparator, file, blockSize, cacheManager); } @Override diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java index 6dbfe130e3bb..fd068baaa27b 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java @@ -51,15 +51,11 @@ public class SortLookupStoreReader implements LookupStoreReader { private final PageFileInput fileInput; public SortLookupStoreReader( - Comparator comparator, - File file, - int blockSize, - SortContext context, - CacheManager cacheManager) + Comparator comparator, File file, int blockSize, CacheManager cacheManager) throws IOException { this.comparator = comparator; this.filePath = file.getAbsolutePath(); - this.fileSize = context.fileSize(); + this.fileSize = file.length(); this.fileInput = PageFileInput.create(file, blockSize, null, fileSize, null); this.blockCache = new BlockCache(fileInput.file(), cacheManager); diff --git a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java index a4bb7c13e832..31f09cfbc65e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java @@ -21,7 +21,6 @@ import org.apache.paimon.compression.BlockCompressionFactory; import org.apache.paimon.compression.BlockCompressionType; import org.apache.paimon.compression.BlockCompressor; -import org.apache.paimon.lookup.LookupStoreFactory; import org.apache.paimon.lookup.LookupStoreWriter; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.memory.MemorySlice; @@ -166,7 +165,7 @@ private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException { } @Override - public LookupStoreFactory.Context close() throws IOException { + public void close() throws IOException { // flush current data block flush(); @@ -195,7 +194,6 @@ public LookupStoreFactory.Context close() throws IOException { LOG.info("totalUncompressedSize: {}", MemorySize.ofBytes(totalUncompressedSize)); LOG.info("totalCompressedSize: {}", MemorySize.ofBytes(totalCompressedSize)); - return new SortContext(position); } private void writeSlice(MemorySlice slice) throws IOException { diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java deleted file mode 100644 index 6444502f2df4..000000000000 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/hash/HashLookupStoreFactoryTest.java +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.lookup.hash; - -import org.apache.paimon.compression.CompressOptions; -import org.apache.paimon.io.DataOutputSerializer; -import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.LookupStoreFactory.Context; -import org.apache.paimon.options.MemorySize; -import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; -import org.apache.paimon.testutils.junit.parameterized.Parameters; -import org.apache.paimon.utils.BloomFilter; -import org.apache.paimon.utils.MathUtils; -import org.apache.paimon.utils.VarLengthIntUtils; - -import org.apache.commons.math3.random.RandomDataGenerator; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; -import org.junit.jupiter.api.io.TempDir; - -import java.io.File; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; - -import static org.assertj.core.api.Assertions.assertThat; - -/* This file is based on source code from the PalDB Project (https://github.com/linkedin/PalDB), licensed by the Apache - * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. */ - -/** Test for {@link HashLookupStoreFactory}. */ -@ExtendWith(ParameterizedTestExtension.class) -public class HashLookupStoreFactoryTest { - - @TempDir Path tempDir; - - private final RandomDataGenerator random = new RandomDataGenerator(); - private final int pageSize = 1024; - - private final boolean enableBloomFilter; - private final CompressOptions compress; - - private File file; - private HashLookupStoreFactory factory; - - public HashLookupStoreFactoryTest(List var) { - this.enableBloomFilter = (Boolean) var.get(0); - this.compress = new CompressOptions((String) var.get(1), 1); - } - - @SuppressWarnings("unused") - @Parameters(name = "enableBf&compress-{0}") - public static List> getVarSeg() { - return Arrays.asList( - Arrays.asList(true, "none"), - Arrays.asList(false, "none"), - Arrays.asList(false, "lz4"), - Arrays.asList(true, "lz4")); - } - - @BeforeEach - public void setUp() throws IOException { - this.factory = - new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); - this.file = new File(tempDir.toFile(), UUID.randomUUID().toString()); - if (!file.createNewFile()) { - throw new IOException("Can not create file: " + file); - } - } - - private BloomFilter.Builder createBloomFiler(boolean enabled) { - if (!enabled) { - return null; - } - return BloomFilter.builder(100, 0.01); - } - - private byte[] toBytes(Object o) { - return toBytes(o.toString()); - } - - private byte[] toBytes(String str) { - return str.getBytes(StandardCharsets.UTF_8); - } - - @TestTemplate - public void testEmpty() throws IOException { - HashLookupStoreWriter writer = - factory.createWriter(file, createBloomFiler(enableBloomFilter)); - Context context = writer.close(); - - assertThat(file.exists()).isTrue(); - - HashLookupStoreReader reader = factory.createReader(file, context); - - assertThat(reader.lookup(toBytes(1))).isNull(); - - reader.close(); - } - - @TestTemplate - public void testOneKey() throws IOException { - HashLookupStoreWriter writer = - factory.createWriter(file, createBloomFiler(enableBloomFilter)); - writer.put(toBytes(1), toBytes("foo")); - Context context = writer.close(); - - HashLookupStoreReader reader = factory.createReader(file, context); - assertThat(reader.lookup(toBytes(1))).isEqualTo(toBytes("foo")); - reader.close(); - } - - @TestTemplate - public void testTwoFirstKeyLength() throws IOException { - int key1 = 1; - int key2 = 245; - - // Write - Context context = writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6}); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1)); - assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6)); - assertThat(reader.lookup(toBytes(0))).isNull(); - assertThat(reader.lookup(toBytes(6))).isNull(); - assertThat(reader.lookup(toBytes(244))).isNull(); - assertThat(reader.lookup(toBytes(246))).isNull(); - assertThat(reader.lookup(toBytes(1245))).isNull(); - } - - @TestTemplate - public void testKeyLengthGap() throws IOException { - int key1 = 1; - int key2 = 2450; - - // Write - Context context = writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6}); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1)); - assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6)); - assertThat(reader.lookup(toBytes(0))).isNull(); - assertThat(reader.lookup(toBytes(6))).isNull(); - assertThat(reader.lookup(toBytes(244))).isNull(); - assertThat(reader.lookup(toBytes(267))).isNull(); - assertThat(reader.lookup(toBytes(2449))).isNull(); - assertThat(reader.lookup(toBytes(2451))).isNull(); - assertThat(reader.lookup(toBytes(2454441))).isNull(); - } - - @TestTemplate - public void testKeyLengthStartTwo() throws IOException { - int key1 = 245; - int key2 = 2450; - - // Write - Context context = writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6}); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1)); - assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6)); - assertThat(reader.lookup(toBytes(6))).isNull(); - assertThat(reader.lookup(toBytes(244))).isNull(); - assertThat(reader.lookup(toBytes(267))).isNull(); - assertThat(reader.lookup(toBytes(2449))).isNull(); - assertThat(reader.lookup(toBytes(2451))).isNull(); - assertThat(reader.lookup(toBytes(2454441))).isNull(); - } - - @TestTemplate - public void testDataOnTwoBuffers() throws IOException { - Object[] keys = new Object[] {1, 2, 3}; - Object[] values = - new Object[] { - generateStringData(100), generateStringData(10000), generateStringData(100) - }; - - int byteSize = toBytes(values[0]).length + toBytes(values[1]).length; - - int pageSize = MathUtils.roundDownToPowerOf2(byteSize - 100); - - factory = - new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); - - Context context = writeStore(factory, file, keys, values); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - for (int i = 0; i < keys.length; i++) { - assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); - } - } - - @TestTemplate - public void testDataSizeOnTwoBuffers() throws IOException { - Object[] keys = new Object[] {1, 2, 3}; - Object[] values = - new Object[] { - generateStringData(100), generateStringData(10000), generateStringData(100) - }; - - byte[] b1 = toBytes(values[0]); - byte[] b2 = toBytes(values[1]); - int byteSize = b1.length + b2.length; - int sizeSize = - VarLengthIntUtils.encodeInt(new DataOutputSerializer(4), b1.length) - + VarLengthIntUtils.encodeInt(new DataOutputSerializer(4), b2.length); - - int pageSize = MathUtils.roundDownToPowerOf2(byteSize + sizeSize + 3); - - factory = - new HashLookupStoreFactory( - new CacheManager(MemorySize.ofMebiBytes(1)), pageSize, 0.75d, compress); - - Context context = writeStore(file, keys, values); - - HashLookupStoreReader reader = factory.createReader(file, context); - for (int i = 0; i < keys.length; i++) { - assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); - } - } - - @TestTemplate - public void testReadStringToString() throws IOException { - testReadKeyToString(generateStringKeys(100)); - } - - @TestTemplate - public void testReadIntToString() throws IOException { - testReadKeyToString(generateIntKeys(100)); - } - - @TestTemplate - public void testReadDoubleToString() throws IOException { - testReadKeyToString(generateDoubleKeys(100)); - } - - @TestTemplate - public void testReadLongToString() throws IOException { - testReadKeyToString(generateLongKeys(100)); - } - - @TestTemplate - public void testReadStringToInt() throws IOException { - testReadKeyToInt(generateStringKeys(100)); - } - - @TestTemplate - public void testReadByteToInt() throws IOException { - testReadKeyToInt(generateByteKeys(100)); - } - - @TestTemplate - public void testReadIntToInt() throws IOException { - testReadKeyToInt(generateIntKeys(100)); - } - - @TestTemplate - public void testReadCompoundToString() throws IOException { - testReadKeyToString(generateCompoundKeys(100)); - } - - @TestTemplate - public void testReadCompoundByteToString() throws IOException { - testReadKeyToString(new Object[] {generateCompoundByteKey()}); - } - - @TestTemplate - public void testCacheExpiration() throws IOException { - int len = 1000; - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - Object[] keys = new Object[len]; - Object[] values = new Object[len]; - for (int i = 0; i < len; i++) { - keys[i] = rnd.nextInt(); - values[i] = generateStringData(100); - } - - // Write - Context context = writeStore(file, keys, values); - - // Read - factory = - new HashLookupStoreFactory( - new CacheManager(new MemorySize(8096)), pageSize, 0.75d, compress); - HashLookupStoreReader reader = factory.createReader(file, context); - for (int i = 0; i < keys.length; i++) { - assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); - } - } - - @TestTemplate - public void testIterate() throws IOException { - Integer[] keys = generateIntKeys(100); - String[] values = generateStringData(keys.length, 12); - - // Write - Context context = writeStore(file, keys, values); - - // Sets - Set keysSet = new HashSet<>(Arrays.asList(keys)); - Set valuesSet = new HashSet<>(Arrays.asList(values)); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - Iterator> itr = reader.iterator(); - for (int i = 0; i < keys.length; i++) { - assertThat(itr.hasNext()).isTrue(); - Map.Entry entry = itr.next(); - assertThat(entry).isNotNull(); - assertThat(keysSet.remove(Integer.valueOf(new String(entry.getKey())))).isTrue(); - assertThat(valuesSet.remove(new String(entry.getValue()))).isTrue(); - - assertThat(reader.lookup(entry.getKey())).isEqualTo(entry.getValue()); - } - assertThat(itr.hasNext()).isFalse(); - reader.close(); - - assertThat(keysSet).isEmpty(); - assertThat(valuesSet).isEmpty(); - } - - // UTILITY - - private void testReadKeyToString(Object[] keys) throws IOException { - // Write - Object[] values = generateStringData(keys.length, 10); - Context context = writeStore(file, keys, values); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - - for (int i = 0; i < keys.length; i++) { - assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); - } - - reader.close(); - } - - private void testReadKeyToInt(Object[] keys) throws IOException { - // Write - Integer[] values = generateIntData(keys.length); - Context context = writeStore(file, keys, values); - - // Read - HashLookupStoreReader reader = factory.createReader(file, context); - - for (int i = 0; i < keys.length; i++) { - assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i])); - } - - reader.close(); - } - - private Context writeStore(File location, Object[] keys, Object[] values) throws IOException { - return writeStore(factory, location, keys, values); - } - - private Context writeStore( - HashLookupStoreFactory factory, File location, Object[] keys, Object[] values) - throws IOException { - HashLookupStoreWriter writer = - factory.createWriter(location, createBloomFiler(enableBloomFilter)); - for (int i = 0; i < keys.length; i++) { - writer.put(toBytes(keys[i]), toBytes(values[i])); - } - return writer.close(); - } - - private Integer[] generateIntKeys(int count) { - Integer[] res = new Integer[count]; - for (int i = 0; i < count; i++) { - res[i] = i; - } - return res; - } - - private String[] generateStringKeys(int count) { - String[] res = new String[count]; - for (int i = 0; i < count; i++) { - res[i] = i + ""; - } - return res; - } - - private Byte[] generateByteKeys(int count) { - if (count > 127) { - throw new RuntimeException("Too large range"); - } - Byte[] res = new Byte[count]; - for (int i = 0; i < count; i++) { - res[i] = (byte) i; - } - return res; - } - - private Double[] generateDoubleKeys(int count) { - Double[] res = new Double[count]; - for (int i = 0; i < count; i++) { - res[i] = (double) i; - } - return res; - } - - private Long[] generateLongKeys(int count) { - Long[] res = new Long[count]; - for (int i = 0; i < count; i++) { - res[i] = (long) i; - } - return res; - } - - private Object[] generateCompoundKeys(int count) { - Object[] res = new Object[count]; - Random random = new Random(345); - for (int i = 0; i < count; i++) { - Object[] k = new Object[] {(byte) random.nextInt(10), i}; - res[i] = k; - } - return res; - } - - private Object[] generateCompoundByteKey() { - Object[] res = new Object[2]; - res[0] = (byte) 6; - res[1] = (byte) 0; - return res; - } - - private String generateStringData(int letters) { - return random.nextHexString(letters); - } - - private String[] generateStringData(int count, int letters) { - String[] res = new String[count]; - for (int i = 0; i < count; i++) { - res[i] = random.nextHexString(letters); - } - return res; - } - - private Integer[] generateIntData(int count) { - Integer[] res = new Integer[count]; - Random random = new Random(count + 34593263544354353L); - for (int i = 0; i < count; i++) { - res[i] = random.nextInt(1000000); - } - return res; - } -} diff --git a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java index 354283a1d495..6d3d0daa156d 100644 --- a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java @@ -22,7 +22,6 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.LookupStoreFactory.Context; import org.apache.paimon.options.MemorySize; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; @@ -98,9 +97,9 @@ public void testNormal() throws IOException { byte[] bytes = toBytes(i); writer.put(bytes, bytes); } - Context context = writer.close(); + writer.close(); - SortLookupStoreReader reader = factory.createReader(file, context); + SortLookupStoreReader reader = factory.createReader(file); for (int i = 0; i < QUERY_COUNT; i++) { int query = rnd.nextInt(VALUE_COUNT); byte[] bytes = toBytes(query); @@ -122,9 +121,9 @@ public void testEmpty() throws IOException { SortLookupStoreWriter writer = factory.createWriter(file, createBloomFiler(bloomFilterEnabled)); - Context context = writer.close(); + writer.close(); - SortLookupStoreReader reader = factory.createReader(file, context); + SortLookupStoreReader reader = factory.createReader(file); byte[] bytes = toBytes(rnd.nextInt(VALUE_COUNT)); assertThat(reader.lookup(bytes)).isNull(); reader.close(); @@ -149,9 +148,9 @@ public void testIntKey() throws IOException { byte[] bytes = toBytes(keySerializer, row, i); writer.put(bytes, toBytes(i)); } - Context context = writer.close(); + writer.close(); - SortLookupStoreReader reader = factory.createReader(file, context); + SortLookupStoreReader reader = factory.createReader(file); for (int i = 0; i < QUERY_COUNT; i++) { int query = rnd.nextInt(VALUE_COUNT); byte[] bytes = toBytes(keySerializer, row, query); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java index f7c783875fb0..f7904af7ea78 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupLevels.java @@ -158,7 +158,6 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { } LookupStoreWriter kvWriter = lookupStoreFactory.createWriter(localFile, bfGenerator.apply(file.rowCount())); - LookupStoreFactory.Context context; try (RecordReader reader = fileReaderFactory.apply(file)) { KeyValue kv; if (valueProcessor.withPosition()) { @@ -187,14 +186,14 @@ private LookupFile createLookupFile(DataFileMeta file) throws IOException { FileIOUtils.deleteFileOrDirectory(localFile); throw e; } finally { - context = kvWriter.close(); + kvWriter.close(); } ownCachedFiles.add(file.fileName()); return new LookupFile( localFile, file, - lookupStoreFactory.createReader(localFile, context), + lookupStoreFactory.createReader(localFile), () -> ownCachedFiles.remove(file.fileName())); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 0bef84e7d854..6448d1bdfbad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; @@ -33,7 +34,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.hash.HashLookupStoreFactory; +import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -198,10 +199,10 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max new LookupLevels.ContainsValueProcessor(), file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), - new HashLookupStoreFactory( + new SortLookupStoreFactory( + new RowCompactedSerializer(keyType).createSliceComparator(), new CacheManager(MemorySize.ofMebiBytes(1)), - 2048, - 0.75, + 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.01), LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index d4ddde5a441e..e264e2ba8e98 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.serializer.RowCompactedSerializer; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.format.FlushingFileFormat; import org.apache.paimon.fs.FileIOFinder; @@ -33,7 +34,7 @@ import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; -import org.apache.paimon.lookup.hash.HashLookupStoreFactory; +import org.apache.paimon.lookup.sort.SortLookupStoreFactory; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -190,7 +191,7 @@ public void testMaxDiskSize() throws IOException { } Levels levels = new Levels(comparator, files, 1); LookupLevels lookupLevels = - createLookupLevels(levels, MemorySize.ofKibiBytes(20)); + createLookupLevels(levels, MemorySize.ofKibiBytes(10)); for (int i = 0; i < fileNum * recordInFile; i++) { KeyValue kv = lookupLevels.lookup(row(i), 1); @@ -275,10 +276,10 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD new LookupLevels.KeyValueProcessor(rowType), file -> createReaderFactory().createRecordReader(file), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), - new HashLookupStoreFactory( + new SortLookupStoreFactory( + new RowCompactedSerializer(keyType).createSliceComparator(), new CacheManager(MemorySize.ofMebiBytes(1)), - 2048, - 0.75, + 4096, new CompressOptions("none", 1)), rowCount -> BloomFilter.builder(rowCount, 0.05), LookupFile.createCache(Duration.ofHours(1), maxDiskSize)); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index ec0675a53af2..6e6519e8ec40 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; -import org.apache.paimon.CoreOptions.LookupLocalFileType; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; @@ -124,7 +123,6 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.FILE_FORMAT_PER_LEVEL; -import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE; import static org.apache.paimon.CoreOptions.METADATA_STATS_MODE_PER_LEVEL; @@ -2281,11 +2279,7 @@ public void testTableQueryForLookup() throws Exception { @Test public void testTableQueryForLookupLocalSortFile() throws Exception { FileStoreTable table = - createFileStoreTable( - options -> { - options.set(CHANGELOG_PRODUCER, LOOKUP); - options.set(LOOKUP_LOCAL_FILE_TYPE, LookupLocalFileType.SORT); - }); + createFileStoreTable(options -> options.set(CHANGELOG_PRODUCER, LOOKUP)); innerTestTableQuery(table); }