diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java new file mode 100644 index 000000000000..35f2da3b2009 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.indexing; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; +import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; +import org.apache.druid.benchmark.datagen.BenchmarkSchemas; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexMergeWithTmpFileBenchmark +{ + @Param({"5"}) + private int numSegments; + + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + @Param({"true", "false"}) + private boolean rollup; + + private static final Logger log = new Logger(IndexMergeWithTmpFileBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + static { + NullHandling.initializeForTests(); + } + + private List indexesToMerge; + private BenchmarkSchemaInfo schemaInfo; + private File tmpDir; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(ExprMacroTable.class, ExprMacroTable.nil()); + JSON_MAPPER.setInjectableValues(injectableValues); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, TmpFileSegmentWriteOutMediumFactory.instance()); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); + + indexesToMerge = new ArrayList<>(); + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + for (int i = 0; i < numSegments; i++) { + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + + tmpDir = FileUtils.createTempDir(); + log.info("Using temp dir: " + tmpDir.getAbsolutePath()); + + File indexFile = INDEX_MERGER_V9.persist( + incIndex, + tmpDir, + new IndexSpec(), + null + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + indexesToMerge.add(qIndex); + } + } + + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void mergeV9(Blackhole blackhole) throws Exception + { + File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-V9-" + System.currentTimeMillis(), ".TEMPFILE"); + tmpFile.delete(); + tmpFile.mkdirs(); + try { + log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); + + File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex( + indexesToMerge, + rollup, + schemaInfo.getAggsArray(), + tmpFile, + new IndexSpec(), + null + ); + + blackhole.consume(mergedFile); + } + finally { + tmpFile.delete(); + + } + + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 9ab579da13d9..b12b15e518b3 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes { private final File file; private final FileChannel ch; + private long writeOutBytes; /** Purposely big-endian, for {@link #writeInt(int)} implementation */ private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer @@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes { this.file = file; this.ch = ch; + this.writeOutBytes = 0L; } private void flushIfNeeded(int bytesNeeded) throws IOException @@ -66,6 +68,7 @@ public void write(int b) throws IOException { flushIfNeeded(1); buffer.put((byte) b); + writeOutBytes++; } @Override @@ -73,6 +76,7 @@ public void writeInt(int v) throws IOException { flushIfNeeded(Integer.BYTES); buffer.putInt(v); + writeOutBytes += Integer.BYTES; } @Override @@ -85,6 +89,7 @@ public int write(ByteBuffer src) throws IOException try { src.limit(src.position() + buffer.capacity()); buffer.put(src); + writeOutBytes += buffer.capacity(); flush(); } finally { @@ -92,7 +97,9 @@ public int write(ByteBuffer src) throws IOException src.limit(srcLimit); } } + int remaining = src.remaining(); buffer.put(src); + writeOutBytes += remaining; return len; } @@ -103,10 +110,9 @@ public void write(byte[] b, int off, int len) throws IOException } @Override - public long size() throws IOException + public long size() { - flush(); - return ch.size(); + return writeOutBytes; } @Override diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index cfaa4181cdbc..8501fa61eeab 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.writeout; import org.easymock.EasyMock; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -36,28 +37,106 @@ public class FileWriteOutBytesTest @Before public void setUp() { - this.mockFileChannel = EasyMock.mock(FileChannel.class); - this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); + mockFileChannel = EasyMock.mock(FileChannel.class); + fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); } @Test - public void testWrite4KBInts() throws IOException + public void write4KBIntsShouldNotFlush() throws IOException { // Write 4KB of ints and expect the write operation of the file channel will be triggered only once. - EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; int remaining = buffer.remaining(); buffer.position(remaining); return remaining; }).times(1); - EasyMock.replay(this.mockFileChannel); + EasyMock.replay(mockFileChannel); final int writeBytes = 4096; final int numOfInt = writeBytes / Integer.BYTES; for (int i = 0; i < numOfInt; i++) { - this.fileWriteOutBytes.writeInt(i); + fileWriteOutBytes.writeInt(i); } - this.fileWriteOutBytes.flush(); - EasyMock.verify(this.mockFileChannel); + // no need to flush up to 4KB + // the first byte after 4KB will cause a flush + fileWriteOutBytes.write(1); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeShouldIncrementSize() throws IOException + { + fileWriteOutBytes.write(1); + Assert.assertEquals(1, fileWriteOutBytes.size()); + } + + @Test + public void writeIntShouldIncrementSize() throws IOException + { + fileWriteOutBytes.writeInt(1); + Assert.assertEquals(4, fileWriteOutBytes.size()); + } + + @Test + public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).once(); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new IOException()) + .once(); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); + try { + fileWriteOutBytes.write(src); + Assert.fail("IOException should have been thrown."); + } + catch (IOException e) { + // The second invocation to flush bytes fails. So the size should count what has already been put successfully + Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); + } + } + + @Test + public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + ByteBuffer src = ByteBuffer.allocate(4096); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + } + @Test + public void sizeDoesNotFlush() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new AssertionError("file channel should not have been written to.")); + EasyMock.replay(mockFileChannel); + long size = fileWriteOutBytes.size(); + Assert.assertEquals(0, size); + fileWriteOutBytes.writeInt(10); + size = fileWriteOutBytes.size(); + Assert.assertEquals(4, size); } }