diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index aad06bac7b5b..8f86ff07ded9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -81,7 +81,13 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - SettableByteEntity settableByteEntitySource = (SettableByteEntity) source; + final SettableByteEntity settableByteEntitySource; + if (source instanceof SettableByteEntity) { + settableByteEntitySource = (SettableByteEntity) source; + } else { + settableByteEntitySource = new SettableByteEntity<>(); + settableByteEntitySource.setEntity((KafkaRecordEntity) source); + } InputRowSchema newInputRowSchema = new InputRowSchema( dummyTimestampSpec, inputRowSchema.getDimensionsSpec(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 56e94215940b..6e6ac8fa10e9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -89,6 +89,43 @@ public KafkaInputReader( public CloseableIterator read() throws IOException { final KafkaRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = extractHeaderAndKeys(record); + + // Ignore tombstone records that have null values. + if (record.getRecord().value() != null) { + return buildBlendedRows(valueParser, mergedHeaderMap); + } else { + return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator()); + } + } + + @Override + public CloseableIterator sample() throws IOException + { + final KafkaRecordEntity record = source.getEntity(); + InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record); + if (record.getRecord().value() != null) { + return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues()); + } else { + final List rows = Collections.singletonList(keysAndHeader); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + } + } + + private List getFinalDimensionList(Set newDimensions) + { + final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); + if (!schemaDimensions.isEmpty()) { + return schemaDimensions; + } else { + return Lists.newArrayList( + Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions()) + ); + } + } + + private Map extractHeader(KafkaRecordEntity record) + { final Map mergedHeaderMap = new HashMap<>(); if (headerParserSupplier != null) { KafkaHeaderReader headerParser = headerParserSupplier.apply(record); @@ -102,7 +139,13 @@ public CloseableIterator read() throws IOException // the header list mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp()); - InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); + return mergedHeaderMap; + } + + private Map extractHeaderAndKeys(KafkaRecordEntity record) throws IOException + { + final Map mergedHeaderMap = extractHeader(record); + final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); if (keyParser != null) { try (CloseableIterator keyIterator = keyParser.read()) { // Key currently only takes the first row and ignores the rest. @@ -123,31 +166,7 @@ public CloseableIterator read() throws IOException ); } } - - // Ignore tombstone records that have null values. - if (record.getRecord().value() != null) { - return buildBlendedRows(valueParser, mergedHeaderMap); - } else { - return buildRowsWithoutValuePayload(mergedHeaderMap); - } - } - - @Override - public CloseableIterator sample() throws IOException - { - return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); - } - - private List getFinalDimensionList(Set newDimensions) - { - final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); - if (!schemaDimensions.isEmpty()) { - return schemaDimensions; - } else { - return Lists.newArrayList( - Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions()) - ); - } + return mergedHeaderMap; } private CloseableIterator buildBlendedRows( @@ -185,15 +204,91 @@ private CloseableIterator buildBlendedRows( ); } - private CloseableIterator buildRowsWithoutValuePayload(Map headerKeyList) + private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException + { + Map mergedHeaderMap = extractHeader(record); + InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); + if (keyParser != null) { + try (CloseableIterator keyIterator = keyParser.sample()) { + // Key currently only takes the first row and ignores the rest. + if (keyIterator.hasNext()) { + // Return type for the key parser should be of type MapBasedInputRow + // Parsers returning other types are not compatible currently. + InputRowListPlusRawValues keyRow = keyIterator.next(); + // Add the key to the mergeList only if the key string is not already present + mergedHeaderMap.putIfAbsent( + keyColumnName, + keyRow.getRawValues().entrySet().stream().findFirst().get().getValue() + ); + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + } + catch (ClassCastException e) { + throw new IOException( + "Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows" + ); + } + } + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + + private CloseableIterator buildBlendedRowsSample( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.sample().map( + rowAndValues -> { + if (rowAndValues.getParseException() != null) { + return rowAndValues; + } + List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); + List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); + ParseException parseException = null; + + for (Map raw : rowAndValues.getRawValuesList()) { + newRawRows.add(buildBlendedEventMap(raw, headerKeyList)); + } + for (InputRow r : rowAndValues.getInputRows()) { + MapBasedInputRow valueRow = null; + try { + valueRow = (MapBasedInputRow) r; + } + catch (ClassCastException e) { + parseException = new ParseException( + null, + "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" + ); + } + if (valueRow != null) { + final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); + final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KafkaInputFormat + newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + newInputRows.add( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(event), + getFinalDimensionList(newDimensions), + event + ) + ); + } + } + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException); + } + ); + } + + private List buildInputRowsForMap(Map headerKeyList) { - final InputRow row = new MapBasedInputRow( - inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), - getFinalDimensionList(headerKeyList.keySet()), - headerKeyList + return Collections.singletonList( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), + getFinalDimensionList(headerKeyList.keySet()), + headerKeyList + ) ); - final List rows = Collections.singletonList(row); - return CloseableIterators.withEmptyBaggage(rows.iterator()); } /** diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 47327dbeee14..242e1fa4a121 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; import org.apache.druid.indexing.kafka.test.TestBroker; @@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null ); + private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema( + "test_ds", + new TimestampSpec("kafka.timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ) + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null + ); + private static TestingCluster zkServer; private static TestBroker kafkaServer; @@ -126,7 +147,7 @@ public static void tearDownClass() throws Exception zkServer.stop(); } - @Test(timeout = 30_000L) + @Test public void testSample() { insertData(generateRecords(TOPIC)); @@ -169,7 +190,7 @@ public void testSample() KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null, null, null), + new SamplerConfig(5, 5_000, null, null), new InputSourceSampler(OBJECT_MAPPER), OBJECT_MAPPER ); @@ -177,6 +198,90 @@ public void testSample() runSamplerAndCompareResponse(samplerSpec, true); } + @Test + public void testSampleKafkaInputFormat() + { + insertData(generateRecords(TOPIC)); + + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, + DATA_SCHEMA_KAFKA_TIMESTAMP, + null, + new KafkaSupervisorIOConfig( + TOPIC, + new KafkaInputFormat( + null, + null, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null + ), + + null, + null, + null, + kafkaServer.consumerProperties(), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, 5_000, null, null), + new InputSourceSampler(OBJECT_MAPPER), + OBJECT_MAPPER + ); + + SamplerResponse response = samplerSpec.sample(); + + Assert.assertEquals(5, response.getNumRowsRead()); + // we can parse an extra row compared to other generated data samples because we are using kafka timestamp + // for timestamp + Assert.assertEquals(4, response.getNumRowsIndexed()); + Assert.assertEquals(5, response.getData().size()); + + Iterator it = response.getData().iterator(); + + SamplerResponse.SamplerResponseRow nextRow; + Map rawInput; + Map parsedInput; + + for (int i = 0; i < 4; i++) { + nextRow = it.next(); + Assert.assertNull(nextRow.isUnparseable()); + rawInput = nextRow.getInput(); + parsedInput = nextRow.getParsed(); + Assert.assertTrue(rawInput.containsKey("kafka.timestamp")); + Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time")); + } + nextRow = it.next(); + Assert.assertTrue(nextRow.isUnparseable()); + + Assert.assertFalse(it.hasNext()); + } + @Test public void testWithInputRowParser() throws IOException { @@ -245,7 +350,7 @@ public void testWithInputRowParser() throws IOException KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null, null, null), + new SamplerConfig(5, 5_000, null, null), new InputSourceSampler(new DefaultObjectMapper()), OBJECT_MAPPER );