Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 83 additions & 15 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import io.druid.common.utils.JodaUtils;
import io.druid.common.utils.SerializerUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ValueType;
Expand Down Expand Up @@ -970,19 +969,28 @@ public Rowboat apply(@Nullable Rowboat input)
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
}

BitmapIndexSeeker[] bitmapIndexSeeker = new BitmapIndexSeeker[indexes.size()];
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[indexes.size()];
for (int j = 0; j < indexes.size(); j++) {
bitmapIndexSeeker[j] = indexes.get(j).getBitmapIndexSeeker(dimension);
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
if (dimConversion != null) {
dictIdSeeker[j] = new DictIdSeeker((IntBuffer) dimConversion.asReadOnlyBuffer().rewind());
} else {
dictIdSeeker[j] = new DictIdSeeker(null);
}
}
for (String dimVal : IndexedIterable.create(dimVals)) {
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(indexes.size());
for (int j = 0; j < indexes.size(); ++j) {
convertedInverteds.add(
new ConvertingIndexedInts(
bitmapIndexSeeker[j].seek(dimVal), rowNumConversions.get(j)
)
);
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != DictIdSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
indexes.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
}

MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
Expand All @@ -999,13 +1007,16 @@ public Rowboat apply(@Nullable Rowboat input)
bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset)
);

if (isSpatialDim && dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
if (isSpatialDim) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
tree.insert(coords, bitset);
}
}
writer.close();
Expand Down Expand Up @@ -1193,6 +1204,63 @@ public IntBuffer getConversionBuffer()
}
}

/**
* Get old dictId from new dictId, and only support access in order
*/
public static class DictIdSeeker
{
static final int NOT_EXIST = -1;
static final int NOT_INIT = -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can u make both static variables private as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, i see that they are used in other places.

private final IntBuffer dimConversions;
private int currIndex;
private int currVal;
private int lastVal;

DictIdSeeker(
IntBuffer dimConversions
)
{
this.dimConversions = dimConversions;
this.currIndex = 0;
this.currVal = NOT_INIT;
this.lastVal = NOT_INIT;
}

public int seek(int dictId)
{
if (dimConversions == null) {
return NOT_EXIST;
}
if (lastVal != NOT_INIT) {
if (dictId <= lastVal) {
throw new ISE("Value dictId[%d] is less than the last value dictId[%d] I have, cannot be.",
dictId, lastVal
);
}
return NOT_EXIST;
}
if (currVal == NOT_INIT) {
currVal = dimConversions.get();
}
if (currVal == dictId) {
int ret = currIndex;
++currIndex;
if (dimConversions.hasRemaining()) {
currVal = dimConversions.get();
} else {
lastVal = dictId;
}
return ret;
} else if (currVal < dictId) {
throw new ISE("Skipped currValue dictId[%d], currIndex[%d]; incoming value dictId[%d]",
currVal, currIndex, dictId
);
} else {
return NOT_EXIST;
}
}
}

public static class ConvertingIndexedInts implements Iterable<Integer>
{
private final IndexedInts baseIndex;
Expand Down
49 changes: 30 additions & 19 deletions processing/src/main/java/io/druid/segment/IndexMergerV9.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import io.druid.collections.CombiningIterable;
import io.druid.common.utils.JodaUtils;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ColumnCapabilitiesImpl;
Expand All @@ -59,7 +58,6 @@
import io.druid.segment.data.IOPeon;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedIntsWriter;
import io.druid.segment.data.IndexedIterable;
import io.druid.segment.data.IndexedRTree;
import io.druid.segment.data.TmpFileIOPeon;
import io.druid.segment.data.VSizeIndexedIntsWriter;
Expand Down Expand Up @@ -220,7 +218,7 @@ public Metadata apply(IndexableAdapter input)
);
makeInvertedIndexes(
adapters, progress, mergedDimensions, indexSpec, v9TmpDir, rowNumConversions,
nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters
nullRowsList, dimValueWriters, bitmapIndexWriters, spatialIndexWriters, dimConversions
);

/************ Finalize Build Columns *************/
Expand Down Expand Up @@ -499,7 +497,8 @@ private void makeInvertedIndexes(
final ArrayList<MutableBitmap> nullRowsList,
final ArrayList<GenericIndexedWriter<String>> dimValueWriters,
final ArrayList<GenericIndexedWriter<ImmutableBitmap>> bitmapIndexWriters,
final ArrayList<ByteBufferWriter<ImmutableRTree>> spatialIndexWriters
final ArrayList<ByteBufferWriter<ImmutableRTree>> spatialIndexWriters,
final ArrayList<Map<String, IntBuffer>> dimConversions
) throws IOException
{
final String section = "build inverted index";
Expand Down Expand Up @@ -527,24 +526,33 @@ private void makeInvertedIndexes(
tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bitmapFactory), bitmapFactory);
}

BitmapIndexSeeker[] bitmapIndexSeeker = new BitmapIndexSeeker[adapters.size()];
DictIdSeeker[] dictIdSeeker = new DictIdSeeker[adapters.size()];
for (int j = 0; j < adapters.size(); j++) {
bitmapIndexSeeker[j] = adapters.get(j).getBitmapIndexSeeker(dimension);
IntBuffer dimConversion = dimConversions.get(j).get(dimension);
if (dimConversion != null) {
dictIdSeeker[j] = new DictIdSeeker((IntBuffer)dimConversion.asReadOnlyBuffer().rewind());
} else {
dictIdSeeker[j] = new DictIdSeeker(null);
}
}

ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex)
);

for (String dimVal : IndexedIterable.create(dimVals)) {
//Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result.
for (int dictId = 0; dictId < dimVals.size(); dictId++) {
progress.progress();
List<Iterable<Integer>> convertedInverteds = Lists.newArrayListWithCapacity(adapters.size());
for (int j = 0; j < adapters.size(); ++j) {
convertedInverteds.add(
new ConvertingIndexedInts(
bitmapIndexSeeker[j].seek(dimVal), rowNumConversions.get(j)
)
);
int seekedDictId = dictIdSeeker[j].seek(dictId);
if (seekedDictId != DictIdSeeker.NOT_EXIST) {
convertedInverteds.add(
new ConvertingIndexedInts(
adapters.get(j).getBitmapIndex(dimension, seekedDictId), rowNumConversions.get(j)
)
);
}
}

MutableBitmap bitset = bitmapSerdeFactory.getBitmapFactory().makeEmptyMutableBitmap();
Expand All @@ -558,19 +566,22 @@ private void makeInvertedIndexes(
}

ImmutableBitmap bitmapToWrite = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(bitset);
if (dimVal == null) {
if ((dictId == 0) && (Iterables.getFirst(dimVals, "") == null)) {
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap.union(bitmapToWrite));
} else {
bitmapIndexWriters.get(dimIndex).write(bitmapToWrite);
}

if (spatialIndexWriter != null && dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
if (spatialIndexWriter != null) {
String dimVal = dimVals.get(dictId);
if (dimVal != null) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, bitset);
}
tree.insert(coords, bitset);
}
}
if (spatialIndexWriter != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package io.druid.segment;

import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
Expand All @@ -44,7 +43,7 @@ public interface IndexableAdapter

IndexedInts getBitmapIndex(String dimension, String value);

BitmapIndexSeeker getBitmapIndexSeeker(String dimension);
IndexedInts getBitmapIndex(String dimension, int dictId);

String getMetricType(String metric);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,17 @@
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.logger.Logger;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.DictionaryEncodedColumn;
import io.druid.segment.column.EmptyBitmapIndexSeeker;
import io.druid.segment.column.GenericColumn;
import io.druid.segment.column.IndexedFloatsGenericColumn;
import io.druid.segment.column.IndexedLongsGenericColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayBasedIndexedInts;
import io.druid.segment.data.BitmapCompressedIndexedInts;
import io.druid.segment.data.EmptyIndexedInts;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.IndexedIterable;
Expand Down Expand Up @@ -303,12 +300,12 @@ public IndexedInts getBitmapIndex(String dimension, String value)
final Column column = input.getColumn(dimension);

if (column == null) {
return new EmptyIndexedInts();
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}

final BitmapIndex bitmaps = column.getBitmapIndex();
if (bitmaps == null) {
return new EmptyIndexedInts();
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}

return new BitmapCompressedIndexedInts(bitmaps.getBitmap(value));
Expand Down Expand Up @@ -339,79 +336,23 @@ public ColumnCapabilities getCapabilities(String column)
}

@Override
public BitmapIndexSeeker getBitmapIndexSeeker(String dimension)
public IndexedInts getBitmapIndex(String dimension, int dictId)
{
final Column column = input.getColumn(dimension);

if (column == null) {
return new EmptyBitmapIndexSeeker();
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}

final BitmapIndex bitmaps = column.getBitmapIndex();
if (bitmaps == null) {
return new EmptyBitmapIndexSeeker();
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}

final Indexed<String> dimSet = getDimValueLookup(dimension);

// BitmapIndexSeeker is the main performance boost comes from.
// In the previous version of index merge, during the creation of invert index, we do something like
// merge sort of multiply bitmap indexes. It simply iterator all the previous sorted values,
// and "binary find" the id in each bitmap indexes, which involves disk IO and is really slow.
// Suppose we have N (which is 100 in our test) small segments, each have M (which is 50000 in our case) rows.
// In high cardinality scenario, we will almost have N * M uniq values. So the complexity will be O(N * M * M * LOG(M)).

// There are 2 properties we did not use during the merging:
// 1. We always travel the dimension values sequentially
// 2. One single dimension value is valid only in one index when cardinality is high enough
// So we introduced the BitmapIndexSeeker, which can only seek value sequentially and can never seek back.
// By using this and the help of "getDimValueLookup", we only need to translate all dimension value to its ID once,
// and the translation is done by self-increase of the integer. We only need to change the CACHED value once after
// previous value is hit, renew the value and increase the ID. The complexity now is O(N * M * LOG(M)).
return new BitmapIndexSeeker()
{
private int currIndex = 0;
private String currVal = null;
private String lastVal = null;

@Override
public IndexedInts seek(String value)
{
if (dimSet == null || dimSet.size() == 0) {
return new EmptyIndexedInts();
}
if (lastVal != null) {
if (GenericIndexed.STRING_STRATEGY.compare(value, lastVal) <= 0) {
throw new ISE(
"Value[%s] is less than the last value[%s] I have, cannot be.",
value, lastVal
);
}
return new EmptyIndexedInts();
}
if (currVal == null) {
currVal = dimSet.get(currIndex);
}
int compareResult = GenericIndexed.STRING_STRATEGY.compare(currVal, value);
if (compareResult == 0) {
IndexedInts ret = new BitmapCompressedIndexedInts(bitmaps.getBitmap(currIndex));
++currIndex;
if (currIndex == dimSet.size()) {
lastVal = value;
} else {
currVal = dimSet.get(currIndex);
}
return ret;
} else if (compareResult < 0) {
throw new ISE(
"Skipped currValue[%s], currIndex[%,d]; incoming value[%s]",
currVal, currIndex, value
);
} else {
return new EmptyIndexedInts();
}
}
};
if (dictId >= 0) {
return new BitmapCompressedIndexedInts(bitmaps.getBitmap(dictId));
} else {
return EmptyIndexedInts.EMPTY_INDEXED_INTS;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import io.druid.segment.column.BitmapIndexSeeker;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
Expand Down Expand Up @@ -95,9 +94,9 @@ public ColumnCapabilities getCapabilities(String column)
}

@Override
public BitmapIndexSeeker getBitmapIndexSeeker(String dimension)
public IndexedInts getBitmapIndex(String dimension, int dictId)
{
return baseAdapter.getBitmapIndexSeeker(dimension);
return baseAdapter.getBitmapIndex(dimension, dictId);
}

@Override
Expand Down
Loading