From 152c04445b7ded53ef117e633841596f75ff0cee Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 7 Mar 2023 22:10:17 -0800 Subject: [PATCH 1/2] fix KafkaInputFormat when used with Sampler API --- .../input/kafkainput/KafkaInputFormat.java | 8 +- .../input/kafkainput/KafkaInputReader.java | 94 ++++++++++++--- .../indexing/kafka/KafkaSamplerSpecTest.java | 111 +++++++++++++++++- 3 files changed, 192 insertions(+), 21 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index aad06bac7b5b..8f86ff07ded9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -81,7 +81,13 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - SettableByteEntity settableByteEntitySource = (SettableByteEntity) source; + final SettableByteEntity settableByteEntitySource; + if (source instanceof SettableByteEntity) { + settableByteEntitySource = (SettableByteEntity) source; + } else { + settableByteEntitySource = new SettableByteEntity<>(); + settableByteEntitySource.setEntity((KafkaRecordEntity) source); + } InputRowSchema newInputRowSchema = new InputRowSchema( dummyTimestampSpec, inputRowSchema.getDimensionsSpec(), diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 56e94215940b..e9f5264919ea 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -89,6 +89,34 @@ public KafkaInputReader( public CloseableIterator read() throws IOException { final KafkaRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = extractHeader(record); + + // Ignore tombstone records that have null values. + if (record.getRecord().value() != null) { + return buildBlendedRows(valueParser, mergedHeaderMap); + } else { + final List rows = Collections.singletonList(buildRowWithoutValuePayload(mergedHeaderMap)); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + } + } + + @Override + public CloseableIterator sample() throws IOException + { + final KafkaRecordEntity record = source.getEntity(); + final Map mergedHeaderMap = extractHeader(record); + if (record.getRecord().value() != null) { + return buildBlendedRowsSample(valueParser, mergedHeaderMap); + } else { + final List rows = Collections.singletonList( + InputRowListPlusRawValues.of(buildRowWithoutValuePayload(mergedHeaderMap), mergedHeaderMap) + ); + return CloseableIterators.withEmptyBaggage(rows.iterator()); + } + } + + private Map extractHeader(KafkaRecordEntity record) throws IOException + { final Map mergedHeaderMap = new HashMap<>(); if (headerParserSupplier != null) { KafkaHeaderReader headerParser = headerParserSupplier.apply(record); @@ -123,19 +151,7 @@ public CloseableIterator read() throws IOException ); } } - - // Ignore tombstone records that have null values. - if (record.getRecord().value() != null) { - return buildBlendedRows(valueParser, mergedHeaderMap); - } else { - return buildRowsWithoutValuePayload(mergedHeaderMap); - } - } - - @Override - public CloseableIterator sample() throws IOException - { - return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); + return mergedHeaderMap; } private List getFinalDimensionList(Set newDimensions) @@ -185,15 +201,59 @@ private CloseableIterator buildBlendedRows( ); } - private CloseableIterator buildRowsWithoutValuePayload(Map headerKeyList) + private CloseableIterator buildBlendedRowsSample( + InputEntityReader valueParser, + Map headerKeyList + ) throws IOException + { + return valueParser.sample().map( + rowAndValues -> { + if (rowAndValues.getParseException() != null) { + return rowAndValues; + } + List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); + List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); + for (InputRow r : rowAndValues.getInputRows()) { + final MapBasedInputRow valueRow; + try { + valueRow = (MapBasedInputRow) r; + } + catch (ClassCastException e) { + throw new ParseException( + null, + "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" + ); + } + + final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); + final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KafkaInputFormat + newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); + + newRawRows.add(event); + if (rowAndValues.getParseException() == null) { + newInputRows.add( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(event), + getFinalDimensionList(newDimensions), + event + ) + ); + } + } + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, rowAndValues.getParseException()); + } + ); + } + + private InputRow buildRowWithoutValuePayload(Map headerKeyList) { - final InputRow row = new MapBasedInputRow( + return new MapBasedInputRow( inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), getFinalDimensionList(headerKeyList.keySet()), headerKeyList ); - final List rows = Collections.singletonList(row); - return CloseableIterators.withEmptyBaggage(rows.iterator()); } /** diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 47327dbeee14..242e1fa4a121 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.data.input.kafkainput.KafkaInputFormat; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec; import org.apache.druid.indexing.kafka.test.TestBroker; @@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null ); + private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema( + "test_ds", + new TimestampSpec("kafka.timestamp", "iso", null), + new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("dim1"), + new StringDimensionSchema("dim1t"), + new StringDimensionSchema("dim2"), + new LongDimensionSchema("dimLong"), + new FloatDimensionSchema("dimFloat") + ) + ), + new AggregatorFactory[]{ + new DoubleSumAggregatorFactory("met1sum", "met1"), + new CountAggregatorFactory("rows") + }, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + null + ); + private static TestingCluster zkServer; private static TestBroker kafkaServer; @@ -126,7 +147,7 @@ public static void tearDownClass() throws Exception zkServer.stop(); } - @Test(timeout = 30_000L) + @Test public void testSample() { insertData(generateRecords(TOPIC)); @@ -169,7 +190,7 @@ public void testSample() KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null, null, null), + new SamplerConfig(5, 5_000, null, null), new InputSourceSampler(OBJECT_MAPPER), OBJECT_MAPPER ); @@ -177,6 +198,90 @@ public void testSample() runSamplerAndCompareResponse(samplerSpec, true); } + @Test + public void testSampleKafkaInputFormat() + { + insertData(generateRecords(TOPIC)); + + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, + DATA_SCHEMA_KAFKA_TIMESTAMP, + null, + new KafkaSupervisorIOConfig( + TOPIC, + new KafkaInputFormat( + null, + null, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null + ), + + null, + null, + null, + kafkaServer.consumerProperties(), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, 5_000, null, null), + new InputSourceSampler(OBJECT_MAPPER), + OBJECT_MAPPER + ); + + SamplerResponse response = samplerSpec.sample(); + + Assert.assertEquals(5, response.getNumRowsRead()); + // we can parse an extra row compared to other generated data samples because we are using kafka timestamp + // for timestamp + Assert.assertEquals(4, response.getNumRowsIndexed()); + Assert.assertEquals(5, response.getData().size()); + + Iterator it = response.getData().iterator(); + + SamplerResponse.SamplerResponseRow nextRow; + Map rawInput; + Map parsedInput; + + for (int i = 0; i < 4; i++) { + nextRow = it.next(); + Assert.assertNull(nextRow.isUnparseable()); + rawInput = nextRow.getInput(); + parsedInput = nextRow.getParsed(); + Assert.assertTrue(rawInput.containsKey("kafka.timestamp")); + Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time")); + } + nextRow = it.next(); + Assert.assertTrue(nextRow.isUnparseable()); + + Assert.assertFalse(it.hasNext()); + } + @Test public void testWithInputRowParser() throws IOException { @@ -245,7 +350,7 @@ public void testWithInputRowParser() throws IOException KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( supervisorSpec, - new SamplerConfig(5, null, null, null), + new SamplerConfig(5, 5_000, null, null), new InputSourceSampler(new DefaultObjectMapper()), OBJECT_MAPPER ); From 4ac1787666deab369bf1c545638bcc0b199c49ed Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 7 Mar 2023 23:30:10 -0800 Subject: [PATCH 2/2] handle key format sampling the same as value format sampling --- .../input/kafkainput/KafkaInputReader.java | 115 ++++++++++++------ 1 file changed, 75 insertions(+), 40 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index e9f5264919ea..6e6ac8fa10e9 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -89,14 +89,13 @@ public KafkaInputReader( public CloseableIterator read() throws IOException { final KafkaRecordEntity record = source.getEntity(); - final Map mergedHeaderMap = extractHeader(record); + final Map mergedHeaderMap = extractHeaderAndKeys(record); // Ignore tombstone records that have null values. if (record.getRecord().value() != null) { return buildBlendedRows(valueParser, mergedHeaderMap); } else { - final List rows = Collections.singletonList(buildRowWithoutValuePayload(mergedHeaderMap)); - return CloseableIterators.withEmptyBaggage(rows.iterator()); + return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator()); } } @@ -104,18 +103,28 @@ public CloseableIterator read() throws IOException public CloseableIterator sample() throws IOException { final KafkaRecordEntity record = source.getEntity(); - final Map mergedHeaderMap = extractHeader(record); + InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record); if (record.getRecord().value() != null) { - return buildBlendedRowsSample(valueParser, mergedHeaderMap); + return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues()); } else { - final List rows = Collections.singletonList( - InputRowListPlusRawValues.of(buildRowWithoutValuePayload(mergedHeaderMap), mergedHeaderMap) - ); + final List rows = Collections.singletonList(keysAndHeader); return CloseableIterators.withEmptyBaggage(rows.iterator()); } } - private Map extractHeader(KafkaRecordEntity record) throws IOException + private List getFinalDimensionList(Set newDimensions) + { + final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); + if (!schemaDimensions.isEmpty()) { + return schemaDimensions; + } else { + return Lists.newArrayList( + Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions()) + ); + } + } + + private Map extractHeader(KafkaRecordEntity record) { final Map mergedHeaderMap = new HashMap<>(); if (headerParserSupplier != null) { @@ -130,7 +139,13 @@ private Map extractHeader(KafkaRecordEntity record) throws IOExc // the header list mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp()); - InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); + return mergedHeaderMap; + } + + private Map extractHeaderAndKeys(KafkaRecordEntity record) throws IOException + { + final Map mergedHeaderMap = extractHeader(record); + final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); if (keyParser != null) { try (CloseableIterator keyIterator = keyParser.read()) { // Key currently only takes the first row and ignores the rest. @@ -154,18 +169,6 @@ private Map extractHeader(KafkaRecordEntity record) throws IOExc return mergedHeaderMap; } - private List getFinalDimensionList(Set newDimensions) - { - final List schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames(); - if (!schemaDimensions.isEmpty()) { - return schemaDimensions; - } else { - return Lists.newArrayList( - Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions()) - ); - } - } - private CloseableIterator buildBlendedRows( InputEntityReader valueParser, Map headerKeyList @@ -201,6 +204,34 @@ private CloseableIterator buildBlendedRows( ); } + private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException + { + Map mergedHeaderMap = extractHeader(record); + InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record); + if (keyParser != null) { + try (CloseableIterator keyIterator = keyParser.sample()) { + // Key currently only takes the first row and ignores the rest. + if (keyIterator.hasNext()) { + // Return type for the key parser should be of type MapBasedInputRow + // Parsers returning other types are not compatible currently. + InputRowListPlusRawValues keyRow = keyIterator.next(); + // Add the key to the mergeList only if the key string is not already present + mergedHeaderMap.putIfAbsent( + keyColumnName, + keyRow.getRawValues().entrySet().stream().findFirst().get().getValue() + ); + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + } + catch (ClassCastException e) { + throw new IOException( + "Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows" + ); + } + } + return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap); + } + private CloseableIterator buildBlendedRowsSample( InputEntityReader valueParser, Map headerKeyList @@ -213,26 +244,28 @@ private CloseableIterator buildBlendedRowsSample( } List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); + ParseException parseException = null; + + for (Map raw : rowAndValues.getRawValuesList()) { + newRawRows.add(buildBlendedEventMap(raw, headerKeyList)); + } for (InputRow r : rowAndValues.getInputRows()) { - final MapBasedInputRow valueRow; + MapBasedInputRow valueRow = null; try { valueRow = (MapBasedInputRow) r; } catch (ClassCastException e) { - throw new ParseException( + parseException = new ParseException( null, "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" ); } - - final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); - final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); - newDimensions.addAll(headerKeyList.keySet()); - // Remove the dummy timestamp added in KafkaInputFormat - newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); - - newRawRows.add(event); - if (rowAndValues.getParseException() == null) { + if (valueRow != null) { + final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); + final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); + newDimensions.addAll(headerKeyList.keySet()); + // Remove the dummy timestamp added in KafkaInputFormat + newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); newInputRows.add( new MapBasedInputRow( inputRowSchema.getTimestampSpec().extractTimestamp(event), @@ -242,17 +275,19 @@ private CloseableIterator buildBlendedRowsSample( ); } } - return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, rowAndValues.getParseException()); + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException); } ); } - - private InputRow buildRowWithoutValuePayload(Map headerKeyList) + + private List buildInputRowsForMap(Map headerKeyList) { - return new MapBasedInputRow( - inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), - getFinalDimensionList(headerKeyList.keySet()), - headerKeyList + return Collections.singletonList( + new MapBasedInputRow( + inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList), + getFinalDimensionList(headerKeyList.keySet()), + headerKeyList + ) ); }