Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@ public Metadata apply(IndexableAdapter input)
ArrayList<FileOutputSupplier> dimOuts = Lists.newArrayListWithCapacity(mergedDimensions.size());
Map<String, Integer> dimensionCardinalities = Maps.newHashMap();
ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(indexes.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());

for (IndexableAdapter index : indexes) {
dimConversions.add(Maps.<String, IntBuffer>newHashMap());
Expand All @@ -641,6 +642,7 @@ public Metadata apply(IndexableAdapter input)
List<Indexed<String>> dimValueLookups = Lists.newArrayListWithCapacity(indexes.size() + 1);
DimValueConverter[] converters = new DimValueConverter[indexes.size()];
boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
boolean[] dimHasValuesByIndex = new boolean[indexes.size()];

for (int i = 0; i < indexes.size(); i++) {
Expand All @@ -651,17 +653,22 @@ public Metadata apply(IndexableAdapter input)
dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues);
} else {
dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false;
}
}

boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);

/*
* Ensure the empty str is always in the dictionary if column is not null across indexes
* Ensure the empty str is always in the dictionary if the dimension was missing from one index but
* has non-null values in another index.
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
* rows from indexes with partial null values for that dimension.
* later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with null/empty str values for that dimension.
*/
if (dimHasValues) {
if (convertMissingDims) {
dimValueLookups.add(EMPTY_STR_DIM_VAL);
for (int i = 0; i < indexes.size(); i++) {
if (!dimHasValuesByIndex[i]) {
Expand Down Expand Up @@ -786,7 +793,7 @@ public Rowboat apply(@Nullable Rowboat input)
}
),
mergedDimensions, dimConversions.get(i), i,
dimensionCardinalities
convertMissingDimsFlags
)
);
}
Expand Down Expand Up @@ -1220,22 +1227,22 @@ public static class MMappedIndexRowIterable implements Iterable<Rowboat>
private final List<String> convertedDims;
private final Map<String, IntBuffer> converters;
private final int indexNumber;
private final Map<String, Integer> dimCardinalities;
private final ArrayList<Boolean> convertMissingDimsFlags;
private static final int[] EMPTY_STR_DIM = new int[]{0};

MMappedIndexRowIterable(
Iterable<Rowboat> index,
List<String> convertedDims,
Map<String, IntBuffer> converters,
int indexNumber,
Map<String, Integer> dimCardinalities
ArrayList<Boolean> convertMissingDimsFlags
)
{
this.index = index;
this.convertedDims = convertedDims;
this.converters = converters;
this.indexNumber = indexNumber;
this.dimCardinalities = dimCardinalities;
this.convertMissingDimsFlags = convertMissingDimsFlags;
}

public Iterable<Rowboat> getIndex()
Expand Down Expand Up @@ -1280,7 +1287,7 @@ public Rowboat apply(@Nullable Rowboat input)
}

if (dims[i] == null) {
if (dimCardinalities.get(dimName) > 0) {
if (convertMissingDimsFlags.get(i)) {
newDims[i] = EMPTY_STR_DIM;
}
continue;
Expand Down
56 changes: 41 additions & 15 deletions processing/src/main/java/io/druid/segment/IndexMergerV9.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.column.ColumnDescriptor;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.ArrayIndexed;
import io.druid.segment.data.BitmapSerdeFactory;
import io.druid.segment.data.ByteBufferWriter;
import io.druid.segment.data.CompressedObjectStrategy;
Expand Down Expand Up @@ -175,15 +174,24 @@ public Metadata apply(IndexableAdapter input)
final ArrayList<GenericIndexedWriter<String>> dimValueWriters = setupDimValueWriters(ioPeon, mergedDimensions);
final ArrayList<Map<String, IntBuffer>> dimConversions = Lists.newArrayListWithCapacity(adapters.size());
final ArrayList<Boolean> dimensionSkipFlag = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> dimHasNullFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
final ArrayList<Boolean> convertMissingDimsFlags = Lists.newArrayListWithCapacity(mergedDimensions.size());
writeDimValueAndSetupDimConversion(
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions
adapters, progress, mergedDimensions, dimCardinalities, dimValueWriters, dimensionSkipFlag, dimConversions,
convertMissingDimsFlags, dimHasNullFlags
);
log.info("Completed dim conversions in %,d millis.", System.currentTimeMillis() - startTime);

/************* Walk through data sets, merge them, and write merged columns *************/
progress.progress();
final Iterable<Rowboat> theRows = makeRowIterable(
adapters, mergedDimensions, mergedMetrics, dimConversions, dimCardinalities, rowMergerFn
adapters,
mergedDimensions,
mergedMetrics,
dimConversions,
dimCardinalities,
convertMissingDimsFlags,
rowMergerFn
);
final LongColumnSerializer timeWriter = setupTimeWriter(ioPeon);
final ArrayList<IndexedIntsWriter> dimWriters = setupDimensionWriters(
Expand All @@ -199,7 +207,7 @@ public Metadata apply(IndexableAdapter input)
}
mergeIndexesAndWriteColumns(
adapters, progress, theRows, timeWriter, dimWriters, metWriters,
dimensionSkipFlag, rowNumConversions, nullRowsList
dimensionSkipFlag, rowNumConversions, nullRowsList, dimHasNullFlags
);

/************ Create Inverted Indexes *************/
Expand Down Expand Up @@ -527,9 +535,6 @@ private void makeInvertedIndexes(
ImmutableBitmap nullRowBitmap = bitmapSerdeFactory.getBitmapFactory().makeImmutableBitmap(
nullRowsList.get(dimIndex)
);
if (Iterables.getFirst(dimVals, "") != null && !nullRowsList.get(dimIndex).isEmpty()) {
bitmapIndexWriters.get(dimIndex).write(nullRowBitmap);
}

for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
Expand Down Expand Up @@ -636,7 +641,8 @@ private void mergeIndexesAndWriteColumns(
final ArrayList<GenericColumnSerializer> metWriters,
final ArrayList<Boolean> dimensionSkipFlag,
final List<IntBuffer> rowNumConversions,
final ArrayList<MutableBitmap> nullRowsList
final ArrayList<MutableBitmap> nullRowsList,
final ArrayList<Boolean> dimHasNullFlags
) throws IOException
{
final String section = "walk through and merge rows";
Expand Down Expand Up @@ -665,7 +671,11 @@ private void mergeIndexesAndWriteColumns(
if (dimensionSkipFlag.get(i)) {
continue;
}
if (dims[i] == null || dims[i].length == 0 || (dims[i].length == 1 && dims[i][0] == 0)) {
if (dims[i] == null || dims[i].length == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this logic is tricky so it is worth a comment here but what is happening

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjy added a comment here

nullRowsList.get(i).add(rowCount);
} else if (dimHasNullFlags.get(i) && dims[i].length == 1 && dims[i][0] == 0) {
// If this dimension has the null/empty str in its dictionary, a row with a single-valued dimension
// that matches the null/empty str's dictionary ID should also be added to nullRowsList.
nullRowsList.get(i).add(rowCount);
}
dimWriters.get(i).add(dims[i]);
Expand Down Expand Up @@ -779,6 +789,7 @@ private Iterable<Rowboat> makeRowIterable(
final List<String> mergedMetrics,
final ArrayList<Map<String, IntBuffer>> dimConversions,
final Map<String, Integer> dimCardinalities,
final ArrayList<Boolean> convertMissingDimsFlags,
final Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>> rowMergerFn
)
{
Expand Down Expand Up @@ -836,7 +847,7 @@ public Rowboat apply(Rowboat input)
mergedDimensions,
dimConversions.get(i),
i,
dimCardinalities
convertMissingDimsFlags
)
);
}
Expand Down Expand Up @@ -868,7 +879,9 @@ private void writeDimValueAndSetupDimConversion(
final Map<String, Integer> dimensionCardinalities,
final ArrayList<GenericIndexedWriter<String>> dimValueWriters,
final ArrayList<Boolean> dimensionSkipFlag,
final List<Map<String, IntBuffer>> dimConversions
final List<Map<String, IntBuffer>> dimConversions,
final ArrayList<Boolean> convertMissingDimsFlags,
final ArrayList<Boolean> dimHasNullFlags
) throws IOException
{
final String section = "setup dimension conversions";
Expand All @@ -889,6 +902,8 @@ private void writeDimValueAndSetupDimConversion(
DimValueConverter[] converters = new DimValueConverter[adapters.size()];

boolean dimHasValues = false;
boolean dimAbsentFromSomeIndex = false;
boolean dimHasNull = false;
boolean[] dimHasValuesByIndex = new boolean[adapters.size()];
for (int i = 0; i < adapters.size(); i++) {
Indexed<String> dimValues = adapters.get(i).getDimValueLookup(dimension);
Expand All @@ -898,17 +913,22 @@ private void writeDimValueAndSetupDimConversion(
dimValueLookups.add(dimValues);
converters[i] = new DimValueConverter(dimValues);
} else {
dimAbsentFromSomeIndex = true;
dimHasValuesByIndex[i] = false;
}
}

boolean convertMissingDims = dimHasValues && dimAbsentFromSomeIndex;
convertMissingDimsFlags.add(convertMissingDims);

/*
* Ensure the empty str is always in the dictionary if column is not null across indexes
* Ensure the empty str is always in the dictionary if the dimension was missing from one index but
* has non-null values in another index.
* This is done so that MMappedIndexRowIterable can convert null columns to empty strings
* later on, to allow rows from indexes with no values at all for a dimension to merge correctly with
* rows from indexes with partial null values for that dimension.
* later on, to allow rows from indexes without a particular dimension to merge correctly with
* rows from indexes with null/empty str values for that dimension.
*/
if (dimHasValues) {
if (convertMissingDims) {
dimValueLookups.add(EMPTY_STR_DIM_VAL);
for (int i = 0; i < adapters.size(); i++) {
if (!dimHasValuesByIndex[i]) {
Expand Down Expand Up @@ -948,6 +968,10 @@ public String apply(@Nullable String input)
value = value == null ? "" : value;
writer.write(value);

if (value.length() == 0) {
dimHasNull = true;
}

for (int i = 0; i < adapters.size(); i++) {
DimValueConverter converter = converters[i];
if (converter != null) {
Expand All @@ -956,6 +980,8 @@ public String apply(@Nullable String input)
}
++cardinality;
}
// Mark if this dim has the null/empty str value in its dictionary, used for determining nullRowsList later.
dimHasNullFlags.add(dimHasNull);

log.info(
"Completed dim[%s] conversions with cardinality[%,d] in %,d millis.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public SegmentMetadataQueryTest()
new ColumnAnalysis(
ValueType.STRING.toString(),
10881,
2,
1,
null
)
), 71982,
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testSegmentMetadataQueryWithDefaultAnalysisMerge()
new ColumnAnalysis(
ValueType.STRING.toString(),
21762,
2,
1,
null
)
),
Expand Down
18 changes: 9 additions & 9 deletions processing/src/test/java/io/druid/segment/IndexMergerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ public void testPersistWithDifferentDims() throws Exception
}

Assert.assertEquals(2, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{2}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {0}}, boatList.get(1).getDims());

checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("dim1", ""));
checkBitmapIndex(Lists.newArrayList(0), adapter.getBitmapIndex("dim1", "1"));
Expand Down Expand Up @@ -845,11 +845,11 @@ public void testNonLexicographicDimOrderMerge() throws Exception

Assert.assertEquals(ImmutableList.of("d3", "d1", "d2"), ImmutableList.copyOf(adapter.getDimensionNames()));
Assert.assertEquals(3, boatList.size());
Assert.assertArrayEquals(new int[][]{{1}, {1}, {3}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {2}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(0).getMetrics());
Assert.assertArrayEquals(new int[][]{{2}, {3}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(1).getMetrics());
Assert.assertArrayEquals(new int[][]{{3}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{2}, {1}, {1}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new Object[]{3L}, boatList.get(2).getMetrics());

checkBitmapIndex(new ArrayList<Integer>(), adapter.getBitmapIndex("d3", ""));
Expand Down Expand Up @@ -1189,10 +1189,10 @@ public void testJointDimMerge() throws Exception
ImmutableList.copyOf(adapter.getDimensionNames())
);
Assert.assertEquals(4, boatList.size());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {1}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {2}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {3}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {4}}, boatList.get(3).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {1}, {0}, {0}, {0}, {0}, {0}}, boatList.get(0).getDims());
Assert.assertArrayEquals(new int[][]{{1}, {2}, {0}, {0}, {1}, {1}, {1}}, boatList.get(1).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {1}, {1}, {2}, {2}, {2}}, boatList.get(2).getDims());
Assert.assertArrayEquals(new int[][]{{0}, {0}, {0}, {2}, {0}, {3}, {3}}, boatList.get(3).getDims());

checkBitmapIndex(Lists.newArrayList(0, 2, 3), adapter.getBitmapIndex("d2", ""));
checkBitmapIndex(Lists.newArrayList(1), adapter.getBitmapIndex("d2", "210"));
Expand Down