Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,13 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
final SettableByteEntity<KafkaRecordEntity> settableByteEntitySource;
if (source instanceof SettableByteEntity) {
settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
} else {
settableByteEntitySource = new SettableByteEntity<>();
settableByteEntitySource.setEntity((KafkaRecordEntity) source);
}
InputRowSchema newInputRowSchema = new InputRowSchema(
dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,43 @@ public KafkaInputReader(
public CloseableIterator<InputRow> read() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
final Map<String, Object> mergedHeaderMap = extractHeaderAndKeys(record);

// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
}
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record);
if (record.getRecord().value() != null) {
return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
} else {
final List<InputRowListPlusRawValues> rows = Collections.singletonList(keysAndHeader);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}
}

private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return schemaDimensions;
} else {
return Lists.newArrayList(
Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
);
}
}

private Map<String, Object> extractHeader(KafkaRecordEntity record)
{
final Map<String, Object> mergedHeaderMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
Expand All @@ -102,7 +139,13 @@ public CloseableIterator<InputRow> read() throws IOException
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());

InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
return mergedHeaderMap;
}

private Map<String, Object> extractHeaderAndKeys(KafkaRecordEntity record) throws IOException
{
final Map<String, Object> mergedHeaderMap = extractHeader(record);
final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
Expand All @@ -123,31 +166,7 @@ public CloseableIterator<InputRow> read() throws IOException
);
}
}

// Ignore tombstone records that have null values.
if (record.getRecord().value() != null) {
return buildBlendedRows(valueParser, mergedHeaderMap);
} else {
return buildRowsWithoutValuePayload(mergedHeaderMap);
}
}

@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
{
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}

private List<String> getFinalDimensionList(Set<String> newDimensions)
{
final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
if (!schemaDimensions.isEmpty()) {
return schemaDimensions;
} else {
return Lists.newArrayList(
Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
);
}
return mergedHeaderMap;
}

private CloseableIterator<InputRow> buildBlendedRows(
Expand Down Expand Up @@ -185,15 +204,91 @@ private CloseableIterator<InputRow> buildBlendedRows(
);
}

private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException
{
Map<String, Object> mergedHeaderMap = extractHeader(record);
InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRowListPlusRawValues> keyIterator = keyParser.sample()) {
// Key currently only takes the first row and ignores the rest.
if (keyIterator.hasNext()) {
// Return type for the key parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
InputRowListPlusRawValues keyRow = keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getRawValues().entrySet().stream().findFirst().get().getValue()
);
return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
}
}
catch (ClassCastException e) {
throw new IOException(
"Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
);
}
}
return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
}

private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
InputEntityReader valueParser,
Map<String, Object> headerKeyList
) throws IOException
{
return valueParser.sample().map(
rowAndValues -> {
if (rowAndValues.getParseException() != null) {
return rowAndValues;
}
List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
ParseException parseException = null;

for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
}
for (InputRow r : rowAndValues.getInputRows()) {
MapBasedInputRow valueRow = null;
try {
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
parseException = new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}
if (valueRow != null) {
final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
newInputRows.add(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(event),
getFinalDimensionList(newDimensions),
event
)
);
}
}
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
}
);
}

private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
{
final InputRow row = new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
return Collections.singletonList(
new MapBasedInputRow(
inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
getFinalDimensionList(headerKeyList.keySet()),
headerKeyList
)
);
final List<InputRow> rows = Collections.singletonList(row);
return CloseableIterators.withEmptyBaggage(rows.iterator());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.test.TestBroker;
Expand Down Expand Up @@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null
);

private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
"test_ds",
new TimestampSpec("kafka.timestamp", "iso", null),
new DimensionsSpec(
Arrays.asList(
new StringDimensionSchema("dim1"),
new StringDimensionSchema("dim1t"),
new StringDimensionSchema("dim2"),
new LongDimensionSchema("dimLong"),
new FloatDimensionSchema("dimFloat")
)
),
new AggregatorFactory[]{
new DoubleSumAggregatorFactory("met1sum", "met1"),
new CountAggregatorFactory("rows")
},
new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
null
);

private static TestingCluster zkServer;
private static TestBroker kafkaServer;

Expand Down Expand Up @@ -126,7 +147,7 @@ public static void tearDownClass() throws Exception
zkServer.stop();
}

@Test(timeout = 30_000L)
@Test
public void testSample()
{
insertData(generateRecords(TOPIC));
Expand Down Expand Up @@ -169,14 +190,98 @@ public void testSample()

KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);

runSamplerAndCompareResponse(samplerSpec, true);
}

@Test
public void testSampleKafkaInputFormat()
{
insertData(generateRecords(TOPIC));

KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
null,
DATA_SCHEMA_KAFKA_TIMESTAMP,
null,
new KafkaSupervisorIOConfig(
TOPIC,
new KafkaInputFormat(
null,
null,
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null,
null,
null
),

null,
null,
null,
kafkaServer.consumerProperties(),
null,
null,
null,
null,
true,
null,
null,
null,
null,
null,
null
),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
);

KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);

SamplerResponse response = samplerSpec.sample();

Assert.assertEquals(5, response.getNumRowsRead());
// we can parse an extra row compared to other generated data samples because we are using kafka timestamp
// for timestamp
Assert.assertEquals(4, response.getNumRowsIndexed());
Assert.assertEquals(5, response.getData().size());

Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();

SamplerResponse.SamplerResponseRow nextRow;
Map<String, Object> rawInput;
Map<String, Object> parsedInput;

for (int i = 0; i < 4; i++) {
nextRow = it.next();
Assert.assertNull(nextRow.isUnparseable());
rawInput = nextRow.getInput();
parsedInput = nextRow.getParsed();
Assert.assertTrue(rawInput.containsKey("kafka.timestamp"));
Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time"));
}
nextRow = it.next();
Assert.assertTrue(nextRow.isUnparseable());

Assert.assertFalse(it.hasNext());
}

@Test
public void testWithInputRowParser() throws IOException
{
Expand Down Expand Up @@ -245,7 +350,7 @@ public void testWithInputRowParser() throws IOException

KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
new SamplerConfig(5, null, null, null),
new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(new DefaultObjectMapper()),
OBJECT_MAPPER
);
Expand Down