Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -199,7 +201,8 @@ public boolean run()
private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config
HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand All @@ -210,10 +213,16 @@ private static IncrementalIndex makeIncrementalIndex(
.withMetrics(aggs)
.build();

return new OnheapIncrementalIndex(
OnheapIncrementalIndex newIndex = new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);

if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
newIndex.loadDimensionIterable(oldDimOrder);
}

return newIndex;
}

public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
Expand Down Expand Up @@ -310,19 +319,21 @@ protected void reduce(
BytesWritable first = iter.next();

if (iter.hasNext()) {
LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config);
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));

while (iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);

if (!index.canAppendRow()) {
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder);
}

index.add(value);
Expand Down Expand Up @@ -523,7 +534,8 @@ protected void reduce(
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
config
config,
null
);
try {
File baseFlushFile = File.createTempFile("base", "flush");
Expand All @@ -536,19 +548,20 @@ protected void reduce(
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();

Set<String> allDimensionNames = Sets.newHashSet();
Set<String> allDimensionNames = Sets.newLinkedHashSet();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set gets populated by inputRow.getDimensions() which depends on the order in which rows appear. Are we sue the order gets preserved here? If I ask for [a,b,c] in my indexing spec, and if my first row only has dimension c, my second row only has dimension b, and my third row only has dimension a, then this set may end up with [c,b,a] instead of [a,b,c]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl In the case where the user has specified a list of dimensions in the indexing spec, the MapInputRowParser initializes all of the MapBasedInputRows that it creates with the specified dimension list, so inputRow.getDimensions() would return the same order every time.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei does it always return the same dimensions for every single row, and is that also the case for other InputRowParsers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl yes, every row returns the same dimensions if the user specifies the dimension list in the ingestion spec. There's only StringInputRowParser and MapInputRowParser which are coupled together.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in that case, why do we need to add the dimensions to the set for every single row?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl for the schemaless case where dimensions are discovered on the fly

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei do we have a test for hadoop indexing that covers both:

  • schema-less indexing to make sure that dimension are persisted in the order they were seen
  • schema-full indexing where we ensuer the order in which dimensions appear when read is different than the ones specified in the spec, and we test that the persisted order corresponds to the spec ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl The test I added to IndexGeneratorJobTest covers the first case, I'll add another test that covers the second

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl Added a test to IndexGeneratorJobTest that checks original order is maintained for schema-full indexing

final ProgressIndicator progressIndicator = makeProgressIndicator(context);

for (final BytesWritable bw : values) {
context.progress();

final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators));
allDimensionNames.addAll(inputRow.getDimensions());
int numRows = index.add(inputRow);

++lineCount;

if (!index.canAppendRow()) {
allDimensionNames.addAll(index.getDimensionOrder());

log.info(index.getOutOfRowsReason());
log.info(
"%,d lines to %,d rows in %,d millis",
Expand All @@ -569,13 +582,16 @@ protected void reduce(
index = makeIncrementalIndex(
bucket,
combiningAggs,
config
config,
allDimensionNames
);
startTime = System.currentTimeMillis();
++indexCount;
}
}

allDimensionNames.addAll(index.getDimensionOrder());

log.info("%,d lines completed.", lineCount);

List<QueryableIndex> indexes = Lists.newArrayListWithCapacity(indexCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
import io.druid.data.input.impl.CSVParseSpec;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.InputRowParser;
import io.druid.data.input.impl.JSONParseSpec;
import io.druid.data.input.impl.StringInputRowParser;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -76,9 +78,18 @@
@RunWith(Parameterized.class)
public class IndexGeneratorJobTest
{

@Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " +
"inputFormatName={4}, buildV9Directly={5}")
final private static AggregatorFactory[] aggs1 = {
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
};

final private static AggregatorFactory[] aggs2 = {
new CountAggregatorFactory("count")
};

@Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
"data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
"aggs={8}, datasourceName={9}, buildV9Directly={10}")
public static Collection<Object[]> constructFeed()
{
final List<Object[]> baseConstructors = Arrays.asList(
Expand Down Expand Up @@ -133,7 +144,10 @@ public static Collection<Object[]> constructFeed()
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
),
null,
aggs1,
"website"
},
{
false,
Expand Down Expand Up @@ -175,7 +189,10 @@ public static Collection<Object[]> constructFeed()
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
),
null,
aggs1,
"website"
},
{
true,
Expand Down Expand Up @@ -217,7 +234,10 @@ public static Collection<Object[]> constructFeed()
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
),
null,
aggs1,
"website"
},
{
false,
Expand Down Expand Up @@ -269,7 +289,68 @@ public static Collection<Object[]> constructFeed()
null,
ImmutableList.of("timestamp", "host", "visited_num")
)
)
),
null,
aggs1,
"website"
},
{
// Tests that new indexes inherit the dimension order from previous index
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{0, 1} // use a single partition, dimension order inheritance is not supported across partitions
}
},
ImmutableList.of(
"{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
"{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
"{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
"{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
"{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
"{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
),
null,
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(null, null, null)
)
),
1, // force 1 row max per index for easier testing
aggs2,
"inherit_dims"
},
{
// Tests that pre-specified dim order is maintained across indexes.
false,
"hashed",
"2014-10-22T00:00:00Z/P1D",
new Integer[][][]{
{
{0, 1}
}
},
ImmutableList.of(
"{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
"{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
"{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
"{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
"{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
"{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
),
null,
new StringInputRowParser(
new JSONParseSpec(
new TimestampSpec("ts", "yyyyMMddHH", null),
new DimensionsSpec(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), null, null)
)
),
1, // force 1 row max per index for easier testing
aggs2,
"inherit_dims2"
}
}
);
Expand Down Expand Up @@ -300,6 +381,9 @@ public static Collection<Object[]> constructFeed()
private final List<String> data;
private final String inputFormatName;
private final InputRowParser inputRowParser;
private final Integer maxRowsInMemory;
private final AggregatorFactory[] aggs;
private final String datasourceName;
private final boolean buildV9Directly;

private ObjectMapper mapper;
Expand All @@ -315,8 +399,11 @@ public IndexGeneratorJobTest(
List<String> data,
String inputFormatName,
InputRowParser inputRowParser,
Integer maxRowsInMemory,
AggregatorFactory[] aggs,
String datasourceName,
boolean buildV9Directly
) throws IOException
) throws IOException
{
this.useCombiner = useCombiner;
this.partitionType = partitionType;
Expand All @@ -325,6 +412,9 @@ public IndexGeneratorJobTest(
this.data = data;
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
this.maxRowsInMemory = maxRowsInMemory;
this.aggs = aggs;
this.datasourceName = datasourceName;
this.buildV9Directly = buildV9Directly;
}

Expand Down Expand Up @@ -381,15 +471,12 @@ public void setUp() throws Exception
config = new HadoopDruidIndexerConfig(
new HadoopIngestionSpec(
new DataSchema(
"website",
datasourceName,
mapper.convertValue(
inputRowParser,
Map.class
),
new AggregatorFactory[]{
new LongSumAggregatorFactory("visited_num", "visited_num"),
new HyperUniquesAggregatorFactory("unique_hosts", "host")
},
aggs,
new UniformGranularitySpec(
Granularity.DAY, QueryGranularity.NONE, ImmutableList.of(this.interval)
),
Expand All @@ -406,7 +493,7 @@ public void setUp() throws Exception
null,
null,
null,
null,
maxRowsInMemory,
false,
false,
false,
Expand Down Expand Up @@ -500,15 +587,29 @@ private void verifyJob(IndexGeneratorJob job) throws IOException
Assert.assertTrue(indexZip.exists());

DataSegment dataSegment = mapper.readValue(descriptor, DataSegment.class);
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
Assert.assertEquals(new Interval(currTime, currTime.plusDays(1)), dataSegment.getInterval());
Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());

if (datasourceName.equals("website")) {
Assert.assertEquals("website", dataSegment.getDataSource());
Assert.assertEquals("host", dataSegment.getDimensions().get(0));
Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
} else if (datasourceName.equals("inherit_dims")) {
Assert.assertEquals("inherit_dims", dataSegment.getDataSource());
Assert.assertEquals(ImmutableList.of("X", "Y", "M", "Q", "B", "F"), dataSegment.getDimensions());
Assert.assertEquals("count", dataSegment.getMetrics().get(0));
} else if (datasourceName.equals("inherit_dims2")) {
Assert.assertEquals("inherit_dims2", dataSegment.getDataSource());
Assert.assertEquals(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), dataSegment.getDimensions());
Assert.assertEquals("count", dataSegment.getMetrics().get(0));
} else {
Assert.fail("Test did not specify supported datasource name");
}

if (partitionType.equals("hashed")) {
Integer[] hashShardInfo = (Integer[]) shardInfo[partitionNum];
HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
Expand Down
3 changes: 2 additions & 1 deletion processing/src/main/java/io/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
continue;
}

int emptyStrIdx = dictionary.indexOf("");
List<Integer> singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableBitmap> bitmaps = bitmapIndexes.get(dimension);
Expand All @@ -626,7 +627,7 @@ public void convertV8toV9(File v8Dir, File v9Dir, IndexSpec indexSpec)
if (rowValue.size() > 1) {
onlyOneValue = false;
}
if (rowValue.size() == 0) {
if (rowValue.size() == 0 || rowValue.get(0) == emptyStrIdx) {
if (nullsSet == null) {
nullsSet = bitmapFactory.makeEmptyMutableBitmap();
}
Expand Down
Loading