From c33551d11a0e9ee5cdb4dcbb2f02e0d3ccc864cf Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sat, 18 Apr 2020 13:32:07 +0800 Subject: [PATCH 01/10] optimize FileWriteOutBytes to avoid high sys cpu --- .../IndexMergeWithTmpFileBenchmark.java | 206 ++++++++++++++++++ .../segment/writeout/FileWriteOutBytes.java | 10 +- 2 files changed, 213 insertions(+), 3 deletions(-) create mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java new file mode 100644 index 000000000000..35f2da3b2009 --- /dev/null +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.benchmark.indexing; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; +import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; +import org.apache.druid.benchmark.datagen.BenchmarkSchemas; +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexMergerV9; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.serde.ComplexMetrics; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 10) +@Measurement(iterations = 25) +public class IndexMergeWithTmpFileBenchmark +{ + @Param({"5"}) + private int numSegments; + + @Param({"75000"}) + private int rowsPerSegment; + + @Param({"basic"}) + private String schema; + + @Param({"true", "false"}) + private boolean rollup; + + private static final Logger log = new Logger(IndexMergeWithTmpFileBenchmark.class); + private static final int RNG_SEED = 9999; + private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; + public static final ObjectMapper JSON_MAPPER; + + static { + NullHandling.initializeForTests(); + } + + private List indexesToMerge; + private BenchmarkSchemaInfo schemaInfo; + private File tmpDir; + + static { + JSON_MAPPER = new DefaultObjectMapper(); + InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(ExprMacroTable.class, ExprMacroTable.nil()); + JSON_MAPPER.setInjectableValues(injectableValues); + INDEX_IO = new IndexIO( + JSON_MAPPER, + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); + INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, TmpFileSegmentWriteOutMediumFactory.instance()); + } + + @Setup + public void setup() throws IOException + { + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + + ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); + + indexesToMerge = new ArrayList<>(); + + schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); + + for (int i = 0; i < numSegments; i++) { + BenchmarkDataGenerator gen = new BenchmarkDataGenerator( + schemaInfo.getColumnSchemas(), + RNG_SEED + i, + schemaInfo.getDataInterval(), + rowsPerSegment + ); + + IncrementalIndex incIndex = makeIncIndex(); + + for (int j = 0; j < rowsPerSegment; j++) { + InputRow row = gen.nextRow(); + if (j % 10000 == 0) { + log.info(j + " rows generated."); + } + incIndex.add(row); + } + + tmpDir = FileUtils.createTempDir(); + log.info("Using temp dir: " + tmpDir.getAbsolutePath()); + + File indexFile = INDEX_MERGER_V9.persist( + incIndex, + tmpDir, + new IndexSpec(), + null + ); + + QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); + indexesToMerge.add(qIndex); + } + } + + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public void mergeV9(Blackhole blackhole) throws Exception + { + File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-V9-" + System.currentTimeMillis(), ".TEMPFILE"); + tmpFile.delete(); + tmpFile.mkdirs(); + try { + log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); + + File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex( + indexesToMerge, + rollup, + schemaInfo.getAggsArray(), + tmpFile, + new IndexSpec(), + null + ); + + blackhole.consume(mergedFile); + } + finally { + tmpFile.delete(); + + } + + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 14fb0f33f3e6..c9bf6f4669ad 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes { private final File file; private final FileChannel ch; + private long writeOutBytes; /** Purposely big-endian, for {@link #writeInt(int)} implementation */ private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer @@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes { this.file = file; this.ch = ch; + this.writeOutBytes = 0L; } private void flushIfNeeded(int bytesNeeded) throws IOException @@ -66,13 +68,15 @@ public void write(int b) throws IOException { flushIfNeeded(1); buffer.put((byte) b); + writeOutBytes++; } @Override public void writeInt(int v) throws IOException { - flushIfNeeded(Integer.SIZE); + flushIfNeeded(Integer.BYTES); buffer.putInt(v); + writeOutBytes += Integer.BYTES; } @Override @@ -93,6 +97,7 @@ public int write(ByteBuffer src) throws IOException } } buffer.put(src); + writeOutBytes += len; return len; } @@ -105,8 +110,7 @@ public void write(byte[] b, int off, int len) throws IOException @Override public long size() throws IOException { - flush(); - return ch.size(); + return writeOutBytes; } @Override From 149e08c091d06cae25b5d2ba375ae3a846c605b2 Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sat, 18 Apr 2020 14:56:56 +0800 Subject: [PATCH 02/10] optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException --- .../org/apache/druid/segment/writeout/FileWriteOutBytes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index c9bf6f4669ad..3c602631e928 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -108,7 +108,7 @@ public void write(byte[] b, int off, int len) throws IOException } @Override - public long size() throws IOException + public long size() { return writeOutBytes; } From 965f74214820fb8768aea549a0498ef376f02cc8 Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sat, 18 Apr 2020 21:23:29 +0800 Subject: [PATCH 03/10] optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size --- .../java/org/apache/druid/segment/writeout/WriteOutBytes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index f6a5e71ce93f..e1c2972b1485 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte /** * Returns the number of bytes written to this WriteOutBytes so far. */ - public abstract long size() throws IOException; + public abstract long size(); /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. From 2abef0304950bc621842395aa8d7ff84cf4a3b83 Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sat, 18 Apr 2020 23:43:54 +0800 Subject: [PATCH 04/10] Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException in writeOutBytes.size" This reverts commit 965f7421 --- .../java/org/apache/druid/segment/writeout/WriteOutBytes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index e1c2972b1485..f6a5e71ce93f 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte /** * Returns the number of bytes written to this WriteOutBytes so far. */ - public abstract long size(); + public abstract long size() throws IOException; /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. From e9157029efd4145bf713882ce7e5244a250dc1e8 Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sat, 18 Apr 2020 23:44:17 +0800 Subject: [PATCH 05/10] Revert "optimize FileWriteOutBytes to avoid high sys cpu -- remove IOException" This reverts commit 149e08c0 --- .../org/apache/druid/segment/writeout/FileWriteOutBytes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 3c602631e928..c9bf6f4669ad 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -108,7 +108,7 @@ public void write(byte[] b, int off, int len) throws IOException } @Override - public long size() + public long size() throws IOException { return writeOutBytes; } From a610003437bf476aafad7f45b6f04700cb3b6dca Mon Sep 17 00:00:00 2001 From: "huanghui.bigrey" Date: Sun, 19 Apr 2020 00:16:00 +0800 Subject: [PATCH 06/10] optimize FileWriteOutBytes to avoid high sys cpu -- avoid IOEception never thrown check --- .../org/apache/druid/segment/writeout/FileWriteOutBytes.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index c9bf6f4669ad..2d970deb50b8 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -110,6 +110,7 @@ public void write(byte[] b, int off, int len) throws IOException @Override public long size() throws IOException { + flushIfNeeded(0); // To avoid check the declared IOException never thrown error return writeOutBytes; } From 69954723bddf223ae1eddb5387de10bcd88b8e7a Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 21 Apr 2020 14:38:47 -0700 Subject: [PATCH 07/10] Fix size counting to handle IOE in FileWriteOutBytes + tests --- .../segment/writeout/FileWriteOutBytes.java | 7 +- .../writeout/FileWriteOutBytesTest.java | 95 +++++++++++++++++-- 2 files changed, 91 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java index 2d970deb50b8..b12b15e518b3 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/FileWriteOutBytes.java @@ -89,6 +89,7 @@ public int write(ByteBuffer src) throws IOException try { src.limit(src.position() + buffer.capacity()); buffer.put(src); + writeOutBytes += buffer.capacity(); flush(); } finally { @@ -96,8 +97,9 @@ public int write(ByteBuffer src) throws IOException src.limit(srcLimit); } } + int remaining = src.remaining(); buffer.put(src); - writeOutBytes += len; + writeOutBytes += remaining; return len; } @@ -108,9 +110,8 @@ public void write(byte[] b, int off, int len) throws IOException } @Override - public long size() throws IOException + public long size() { - flushIfNeeded(0); // To avoid check the declared IOException never thrown error return writeOutBytes; } diff --git a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java index cfaa4181cdbc..8501fa61eeab 100644 --- a/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java +++ b/processing/src/test/java/org/apache/druid/segment/writeout/FileWriteOutBytesTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.writeout; import org.easymock.EasyMock; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -36,28 +37,106 @@ public class FileWriteOutBytesTest @Before public void setUp() { - this.mockFileChannel = EasyMock.mock(FileChannel.class); - this.fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); + mockFileChannel = EasyMock.mock(FileChannel.class); + fileWriteOutBytes = new FileWriteOutBytes(EasyMock.mock(File.class), mockFileChannel); } @Test - public void testWrite4KBInts() throws IOException + public void write4KBIntsShouldNotFlush() throws IOException { // Write 4KB of ints and expect the write operation of the file channel will be triggered only once. - EasyMock.expect(this.mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) .andAnswer(() -> { ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; int remaining = buffer.remaining(); buffer.position(remaining); return remaining; }).times(1); - EasyMock.replay(this.mockFileChannel); + EasyMock.replay(mockFileChannel); final int writeBytes = 4096; final int numOfInt = writeBytes / Integer.BYTES; for (int i = 0; i < numOfInt; i++) { - this.fileWriteOutBytes.writeInt(i); + fileWriteOutBytes.writeInt(i); } - this.fileWriteOutBytes.flush(); - EasyMock.verify(this.mockFileChannel); + // no need to flush up to 4KB + // the first byte after 4KB will cause a flush + fileWriteOutBytes.write(1); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeShouldIncrementSize() throws IOException + { + fileWriteOutBytes.write(1); + Assert.assertEquals(1, fileWriteOutBytes.size()); + } + + @Test + public void writeIntShouldIncrementSize() throws IOException + { + fileWriteOutBytes.writeInt(1); + Assert.assertEquals(4, fileWriteOutBytes.size()); + } + + @Test + public void writeBufferLargerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).times(1); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 + 1); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + EasyMock.verify(mockFileChannel); + } + + @Test + public void writeBufferLargerThanCapacityThrowsIOEInTheMiddleShouldIncrementSizeCorrectly() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andAnswer(() -> { + ByteBuffer buffer = (ByteBuffer) EasyMock.getCurrentArguments()[0]; + int remaining = buffer.remaining(); + buffer.position(remaining); + return remaining; + }).once(); + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new IOException()) + .once(); + EasyMock.replay(mockFileChannel); + ByteBuffer src = ByteBuffer.allocate(4096 * 2 + 1); + try { + fileWriteOutBytes.write(src); + Assert.fail("IOException should have been thrown."); + } + catch (IOException e) { + // The second invocation to flush bytes fails. So the size should count what has already been put successfully + Assert.assertEquals(4096 * 2, fileWriteOutBytes.size()); + } + } + + @Test + public void writeBufferSmallerThanCapacityShouldIncrementSizeCorrectly() throws IOException + { + ByteBuffer src = ByteBuffer.allocate(4096); + fileWriteOutBytes.write(src); + Assert.assertEquals(src.capacity(), fileWriteOutBytes.size()); + } + @Test + public void sizeDoesNotFlush() throws IOException + { + EasyMock.expect(mockFileChannel.write(EasyMock.anyObject(ByteBuffer.class))) + .andThrow(new AssertionError("file channel should not have been written to.")); + EasyMock.replay(mockFileChannel); + long size = fileWriteOutBytes.size(); + Assert.assertEquals(0, size); + fileWriteOutBytes.writeInt(10); + size = fileWriteOutBytes.size(); + Assert.assertEquals(4, size); } } From 290973536d2566e5e2ecc62507940e7dfc2bf3f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E8=BE=89?= Date: Thu, 23 Apr 2020 00:22:12 +0800 Subject: [PATCH 08/10] remove unused throws IOException in WriteOutBytes.size() --- .../java/org/apache/druid/segment/writeout/WriteOutBytes.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java index f6a5e71ce93f..e1c2972b1485 100644 --- a/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java +++ b/processing/src/main/java/org/apache/druid/segment/writeout/WriteOutBytes.java @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte /** * Returns the number of bytes written to this WriteOutBytes so far. */ - public abstract long size() throws IOException; + public abstract long size(); /** * Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel. From e7876c7bb3897b3565d7d6869f80e0383d08cd23 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 22 Apr 2020 13:56:52 -0700 Subject: [PATCH 09/10] Remove redundant throws IOExcpetion clauses --- .../org/apache/druid/segment/data/ByteBufferWriter.java | 2 +- .../segment/data/EntireLayoutColumnarDoublesSerializer.java | 2 +- .../segment/data/EntireLayoutColumnarFloatsSerializer.java | 2 +- .../org/apache/druid/segment/data/GenericIndexedWriter.java | 6 +++--- .../apache/druid/segment/serde/ComplexColumnSerializer.java | 2 +- .../serde/LargeColumnSupportedComplexColumnSerializer.java | 2 +- .../org/apache/druid/segment/serde/MetaSerdeHelper.java | 6 +++--- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java index ee1e7813dcac..bda81150a681 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/ByteBufferWriter.java @@ -63,7 +63,7 @@ public void write(T objectToWrite) throws IOException } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return headerOut.size() + valueOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java index b33031966cb6..6265535a7345 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarDoublesSerializer.java @@ -80,7 +80,7 @@ public void add(double value) throws IOException } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return META_SERDE_HELPER.size(this) + valuesOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java index 75b7290c67f5..a933265d9e14 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/data/EntireLayoutColumnarFloatsSerializer.java @@ -81,7 +81,7 @@ public void add(float value) throws IOException } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return META_SERDE_HELPER.size(this) + valuesOut.size(); } diff --git a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java index 95bc141d2a5c..d4254ddd98d4 100644 --- a/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java +++ b/processing/src/main/java/org/apache/druid/segment/data/GenericIndexedWriter.java @@ -296,7 +296,7 @@ private long getOffset(int index) throws IOException } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { if (requireMultipleFiles) { // for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file. @@ -394,7 +394,7 @@ private void writeToMultiFiles(WritableByteChannel channel, FileSmoosher smooshe * * @throws IOException */ - private int bagSizePower() throws IOException + private int bagSizePower() { long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten; @@ -421,7 +421,7 @@ private int bagSizePower() throws IOException * * @throws IOException */ - private boolean actuallyFits(int powerTwo) throws IOException + private boolean actuallyFits(int powerTwo) { long lastValueOffset = 0; long currentValueOffset = 0; diff --git a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java index 1441c782f18e..c7a2d5923098 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/ComplexColumnSerializer.java @@ -66,7 +66,7 @@ public void serialize(ColumnValueSelector selector) throws IOExcept } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return writer.getSerializedSize(); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java index bd68aef5f969..c23c97cb3de0 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/LargeColumnSupportedComplexColumnSerializer.java @@ -106,7 +106,7 @@ public void serialize(ColumnValueSelector selector) throws IOExcept } @Override - public long getSerializedSize() throws IOException + public long getSerializedSize() { return writer.getSerializedSize(); } diff --git a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java index ddda31f41cf7..113821cee150 100644 --- a/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java +++ b/processing/src/main/java/org/apache/druid/segment/serde/MetaSerdeHelper.java @@ -117,7 +117,7 @@ public int size(T x) public interface FieldWriter { - void writeTo(ByteBuffer buffer, T x) throws IOException; + void writeTo(ByteBuffer buffer, T x); int size(T x); } @@ -125,10 +125,10 @@ public interface FieldWriter @FunctionalInterface public interface IntFieldWriter extends FieldWriter { - int getField(T x) throws IOException; + int getField(T x); @Override - default void writeTo(ByteBuffer buffer, T x) throws IOException + default void writeTo(ByteBuffer buffer, T x) { buffer.putInt(getField(x)); } From b50f6315cf4c44d8035cf3971790786e5f1b7652 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 22 Apr 2020 17:55:10 -0700 Subject: [PATCH 10/10] Parameterize IndexMergeBenchmark --- .../indexing/IndexMergeBenchmark.java | 87 +++++--- .../IndexMergeWithTmpFileBenchmark.java | 206 ------------------ 2 files changed, 53 insertions(+), 240 deletions(-) delete mode 100644 benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java index 5f330ee7cf1d..6b3b902c3ad4 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -35,11 +35,13 @@ import org.apache.druid.segment.IndexMergerV9; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.serde.ComplexMetrics; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -66,6 +68,7 @@ @Measurement(iterations = 25) public class IndexMergeBenchmark { + @Param({"5"}) private int numSegments; @@ -78,9 +81,13 @@ public class IndexMergeBenchmark @Param({"true", "false"}) private boolean rollup; + @Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"}) + private SegmentWriteOutType factoryType; + + private static final Logger log = new Logger(IndexMergeBenchmark.class); private static final int RNG_SEED = 9999; - private static final IndexMergerV9 INDEX_MERGER_V9; + private static final IndexIO INDEX_IO; public static final ObjectMapper JSON_MAPPER; @@ -91,6 +98,7 @@ public class IndexMergeBenchmark private List indexesToMerge; private BenchmarkSchemaInfo schemaInfo; private File tmpDir; + private IndexMergerV9 indexMergerV9; static { JSON_MAPPER = new DefaultObjectMapper(); @@ -99,23 +107,16 @@ public class IndexMergeBenchmark JSON_MAPPER.setInjectableValues(injectableValues); INDEX_IO = new IndexIO( JSON_MAPPER, - new ColumnConfig() - { - @Override - public int columnCacheSizeBytes() - { - return 0; - } - } + () -> 0 ); - INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance()); } @Setup public void setup() throws IOException { - log.info("SETUP CALLED AT " + System.currentTimeMillis()); + log.info("SETUP CALLED AT " + System.currentTimeMillis()); + indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType)); ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); indexesToMerge = new ArrayList<>(); @@ -143,7 +144,7 @@ public void setup() throws IOException tmpDir = FileUtils.createTempDir(); log.info("Using temp dir: " + tmpDir.getAbsolutePath()); - File indexFile = INDEX_MERGER_V9.persist( + File indexFile = indexMergerV9.persist( incIndex, tmpDir, new IndexSpec(), @@ -155,26 +156,6 @@ public void setup() throws IOException } } - @TearDown - public void tearDown() throws IOException - { - FileUtils.deleteDirectory(tmpDir); - } - - private IncrementalIndex makeIncIndex() - { - return new IncrementalIndex.Builder() - .setIndexSchema( - new IncrementalIndexSchema.Builder() - .withMetrics(schemaInfo.getAggsArray()) - .withRollup(rollup) - .build() - ) - .setReportParseExceptions(false) - .setMaxRowCount(rowsPerSegment) - .buildOnheap(); - } - @Benchmark @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MICROSECONDS) @@ -186,7 +167,7 @@ public void mergeV9(Blackhole blackhole) throws Exception try { log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); - File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex( + File mergedFile = indexMergerV9.mergeQueryableIndex( indexesToMerge, rollup, schemaInfo.getAggsArray(), @@ -199,8 +180,46 @@ public void mergeV9(Blackhole blackhole) throws Exception } finally { tmpFile.delete(); + } + } + + @TearDown + public void tearDown() throws IOException + { + FileUtils.deleteDirectory(tmpDir); + } + + public enum SegmentWriteOutType + { + TMP_FILE, + OFF_HEAP, + ON_HEAP + } + private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type) + { + switch (type) { + case TMP_FILE: + return TmpFileSegmentWriteOutMediumFactory.instance(); + case OFF_HEAP: + return OffHeapMemorySegmentWriteOutMediumFactory.instance(); + case ON_HEAP: + return OnHeapMemorySegmentWriteOutMediumFactory.instance(); } + throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type); + } + private IncrementalIndex makeIncIndex() + { + return new IncrementalIndex.Builder() + .setIndexSchema( + new IncrementalIndexSchema.Builder() + .withMetrics(schemaInfo.getAggsArray()) + .withRollup(rollup) + .build() + ) + .setReportParseExceptions(false) + .setMaxRowCount(rowsPerSegment) + .buildOnheap(); } } diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java deleted file mode 100644 index 35f2da3b2009..000000000000 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeWithTmpFileBenchmark.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.benchmark.indexing; - -import com.fasterxml.jackson.databind.InjectableValues; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator; -import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo; -import org.apache.druid.benchmark.datagen.BenchmarkSchemas; -import org.apache.druid.common.config.NullHandling; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.FileUtils; -import org.apache.druid.java.util.common.logger.Logger; -import org.apache.druid.math.expr.ExprMacroTable; -import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde; -import org.apache.druid.segment.IndexIO; -import org.apache.druid.segment.IndexMergerV9; -import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.column.ColumnConfig; -import org.apache.druid.segment.incremental.IncrementalIndex; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; -import org.apache.druid.segment.serde.ComplexMetrics; -import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.TearDown; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.infra.Blackhole; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -@State(Scope.Benchmark) -@Fork(value = 1) -@Warmup(iterations = 10) -@Measurement(iterations = 25) -public class IndexMergeWithTmpFileBenchmark -{ - @Param({"5"}) - private int numSegments; - - @Param({"75000"}) - private int rowsPerSegment; - - @Param({"basic"}) - private String schema; - - @Param({"true", "false"}) - private boolean rollup; - - private static final Logger log = new Logger(IndexMergeWithTmpFileBenchmark.class); - private static final int RNG_SEED = 9999; - private static final IndexMergerV9 INDEX_MERGER_V9; - private static final IndexIO INDEX_IO; - public static final ObjectMapper JSON_MAPPER; - - static { - NullHandling.initializeForTests(); - } - - private List indexesToMerge; - private BenchmarkSchemaInfo schemaInfo; - private File tmpDir; - - static { - JSON_MAPPER = new DefaultObjectMapper(); - InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(ExprMacroTable.class, ExprMacroTable.nil()); - JSON_MAPPER.setInjectableValues(injectableValues); - INDEX_IO = new IndexIO( - JSON_MAPPER, - new ColumnConfig() - { - @Override - public int columnCacheSizeBytes() - { - return 0; - } - } - ); - INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, TmpFileSegmentWriteOutMediumFactory.instance()); - } - - @Setup - public void setup() throws IOException - { - log.info("SETUP CALLED AT " + System.currentTimeMillis()); - - ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde()); - - indexesToMerge = new ArrayList<>(); - - schemaInfo = BenchmarkSchemas.SCHEMA_MAP.get(schema); - - for (int i = 0; i < numSegments; i++) { - BenchmarkDataGenerator gen = new BenchmarkDataGenerator( - schemaInfo.getColumnSchemas(), - RNG_SEED + i, - schemaInfo.getDataInterval(), - rowsPerSegment - ); - - IncrementalIndex incIndex = makeIncIndex(); - - for (int j = 0; j < rowsPerSegment; j++) { - InputRow row = gen.nextRow(); - if (j % 10000 == 0) { - log.info(j + " rows generated."); - } - incIndex.add(row); - } - - tmpDir = FileUtils.createTempDir(); - log.info("Using temp dir: " + tmpDir.getAbsolutePath()); - - File indexFile = INDEX_MERGER_V9.persist( - incIndex, - tmpDir, - new IndexSpec(), - null - ); - - QueryableIndex qIndex = INDEX_IO.loadIndex(indexFile); - indexesToMerge.add(qIndex); - } - } - - @TearDown - public void tearDown() throws IOException - { - FileUtils.deleteDirectory(tmpDir); - } - - private IncrementalIndex makeIncIndex() - { - return new IncrementalIndex.Builder() - .setIndexSchema( - new IncrementalIndexSchema.Builder() - .withMetrics(schemaInfo.getAggsArray()) - .withRollup(rollup) - .build() - ) - .setReportParseExceptions(false) - .setMaxRowCount(rowsPerSegment) - .buildOnheap(); - } - - @Benchmark - @BenchmarkMode(Mode.AverageTime) - @OutputTimeUnit(TimeUnit.MICROSECONDS) - public void mergeV9(Blackhole blackhole) throws Exception - { - File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-V9-" + System.currentTimeMillis(), ".TEMPFILE"); - tmpFile.delete(); - tmpFile.mkdirs(); - try { - log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory()); - - File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex( - indexesToMerge, - rollup, - schemaInfo.getAggsArray(), - tmpFile, - new IndexSpec(), - null - ); - - blackhole.consume(mergedFile); - } - finally { - tmpFile.delete(); - - } - - } -}