Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.OnHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand All @@ -66,6 +68,7 @@
@Measurement(iterations = 25)
public class IndexMergeBenchmark
{

@Param({"5"})
private int numSegments;

Expand All @@ -78,9 +81,13 @@ public class IndexMergeBenchmark
@Param({"true", "false"})
private boolean rollup;

@Param({"OFF_HEAP", "TMP_FILE", "ON_HEAP"})
private SegmentWriteOutType factoryType;


private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMergerV9 INDEX_MERGER_V9;

private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;

Expand All @@ -91,6 +98,7 @@ public class IndexMergeBenchmark
private List<QueryableIndex> indexesToMerge;
private BenchmarkSchemaInfo schemaInfo;
private File tmpDir;
private IndexMergerV9 indexMergerV9;

static {
JSON_MAPPER = new DefaultObjectMapper();
Expand All @@ -99,23 +107,16 @@ public class IndexMergeBenchmark
JSON_MAPPER.setInjectableValues(injectableValues);
INDEX_IO = new IndexIO(
JSON_MAPPER,
new ColumnConfig()
{
@Override
public int columnCacheSizeBytes()
{
return 0;
}
}
() -> 0
);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, OffHeapMemorySegmentWriteOutMediumFactory.instance());
}

@Setup
public void setup() throws IOException
{
log.info("SETUP CALLED AT " + System.currentTimeMillis());

log.info("SETUP CALLED AT " + System.currentTimeMillis());
indexMergerV9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, getSegmentWriteOutMediumFactory(factoryType));
ComplexMetrics.registerSerde("hyperUnique", new HyperUniquesSerde());

indexesToMerge = new ArrayList<>();
Expand Down Expand Up @@ -143,7 +144,7 @@ public void setup() throws IOException
tmpDir = FileUtils.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());

File indexFile = INDEX_MERGER_V9.persist(
File indexFile = indexMergerV9.persist(
incIndex,
tmpDir,
new IndexSpec(),
Expand All @@ -155,26 +156,6 @@ public void setup() throws IOException
}
}

@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand All @@ -186,7 +167,7 @@ public void mergeV9(Blackhole blackhole) throws Exception
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());

File mergedFile = INDEX_MERGER_V9.mergeQueryableIndex(
File mergedFile = indexMergerV9.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
Expand All @@ -199,8 +180,46 @@ public void mergeV9(Blackhole blackhole) throws Exception
}
finally {
tmpFile.delete();
}
}

@TearDown
public void tearDown() throws IOException
{
FileUtils.deleteDirectory(tmpDir);
}

public enum SegmentWriteOutType
{
TMP_FILE,
OFF_HEAP,
ON_HEAP
}

private SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(SegmentWriteOutType type)
{
switch (type) {
case TMP_FILE:
return TmpFileSegmentWriteOutMediumFactory.instance();
case OFF_HEAP:
return OffHeapMemorySegmentWriteOutMediumFactory.instance();
case ON_HEAP:
return OnHeapMemorySegmentWriteOutMediumFactory.instance();
}
throw new RuntimeException("Could not create SegmentWriteOutMediumFactory of type: " + type);
}

private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setIndexSchema(
new IncrementalIndexSchema.Builder()
.withMetrics(schemaInfo.getAggsArray())
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void write(T objectToWrite) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return headerOut.size() + valueOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void add(double value) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void add(float value) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return META_SERDE_HELPER.size(this) + valuesOut.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private long getOffset(int index) throws IOException
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
if (requireMultipleFiles) {
// for multi-file version (version 2), getSerializedSize() returns number of bytes in meta file.
Expand Down Expand Up @@ -394,7 +394,7 @@ private void writeToMultiFiles(WritableByteChannel channel, FileSmoosher smooshe
*
* @throws IOException
*/
private int bagSizePower() throws IOException
private int bagSizePower()
{
long avgObjectSize = (valuesOut.size() + numWritten - 1) / numWritten;

Expand All @@ -421,7 +421,7 @@ private int bagSizePower() throws IOException
*
* @throws IOException
*/
private boolean actuallyFits(int powerTwo) throws IOException
private boolean actuallyFits(int powerTwo)
{
long lastValueOffset = 0;
long currentValueOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void serialize(ColumnValueSelector<? extends T> selector) throws IOExcept
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void serialize(ColumnValueSelector<? extends T> selector) throws IOExcept
}

@Override
public long getSerializedSize() throws IOException
public long getSerializedSize()
{
return writer.getSerializedSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ public int size(T x)

public interface FieldWriter<T>
{
void writeTo(ByteBuffer buffer, T x) throws IOException;
void writeTo(ByteBuffer buffer, T x);

int size(T x);
}

@FunctionalInterface
public interface IntFieldWriter<T> extends FieldWriter<T>
{
int getField(T x) throws IOException;
int getField(T x);

@Override
default void writeTo(ByteBuffer buffer, T x) throws IOException
default void writeTo(ByteBuffer buffer, T x)
{
buffer.putInt(getField(x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
private final File file;
private final FileChannel ch;
private long writeOutBytes;

/** Purposely big-endian, for {@link #writeInt(int)} implementation */
private final ByteBuffer buffer = ByteBuffer.allocate(4096); // 4K page sized buffer
Expand All @@ -44,6 +45,7 @@ final class FileWriteOutBytes extends WriteOutBytes
{
this.file = file;
this.ch = ch;
this.writeOutBytes = 0L;
}

private void flushIfNeeded(int bytesNeeded) throws IOException
Expand All @@ -66,13 +68,15 @@ public void write(int b) throws IOException
{
flushIfNeeded(1);
buffer.put((byte) b);
writeOutBytes++;
}

@Override
public void writeInt(int v) throws IOException
{
flushIfNeeded(Integer.BYTES);
buffer.putInt(v);
writeOutBytes += Integer.BYTES;
}

@Override
Expand All @@ -85,14 +89,17 @@ public int write(ByteBuffer src) throws IOException
try {
src.limit(src.position() + buffer.capacity());
buffer.put(src);
writeOutBytes += buffer.capacity();
flush();
}
finally {
// IOException may occur in flush(), reset src limit to the original
src.limit(srcLimit);
}
}
int remaining = src.remaining();
buffer.put(src);
writeOutBytes += remaining;
return len;
}

Expand All @@ -103,10 +110,9 @@ public void write(byte[] b, int off, int len) throws IOException
}

@Override
public long size() throws IOException
public long size()
{
flush();
return ch.size();
return writeOutBytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class WriteOutBytes extends OutputStream implements WritableByte
/**
* Returns the number of bytes written to this WriteOutBytes so far.
*/
public abstract long size() throws IOException;
public abstract long size();

/**
* Takes all bytes that are written to this WriteOutBytes so far and writes them into the given channel.
Expand Down
Loading