From ecb023812f2268e7db3554b78133f3f50843c72f Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Jun 2024 11:57:36 +0530 Subject: [PATCH 1/5] Support ListBasedInputRow in Kafka ingestion with header --- .../input/kafkainput/KafkaInputReader.java | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 31b7cf66be19..0edcb231c58b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -24,6 +24,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.kafka.KafkaRecordEntity; @@ -173,14 +174,24 @@ private CloseableIterator buildBlendedRows( r -> { final MapBasedInputRow valueRow; try { - // Return type for the value parser should be of type MapBasedInputRow - // Parsers returning other types are not compatible currently. - valueRow = (MapBasedInputRow) r; + if (r instanceof ListBasedInputRow) { + valueRow = new MapBasedInputRow( + r.getTimestamp(), + r.getDimensions(), + ((ListBasedInputRow) r).asMap() + ); + } else { + // Return type for the value parser should be of type MapBasedInputRow + // Parsers returning other types are not compatible currently. + valueRow = (MapBasedInputRow) r; + } } catch (ClassCastException e) { throw new ParseException( null, - "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" + "Unsupported input format[%s] in valueFormat. KafkaInputFormat only supports" + + " input formats that return MapBasedInputRows or ListBasedInputRows.", + r.getClass().getSimpleName() ); } @@ -252,12 +263,22 @@ private CloseableIterator buildBlendedRowsSample( for (InputRow r : rowAndValues.getInputRows()) { MapBasedInputRow valueRow = null; try { - valueRow = (MapBasedInputRow) r; + if (r instanceof ListBasedInputRow) { + valueRow = new MapBasedInputRow( + r.getTimestamp(), + r.getDimensions(), + ((ListBasedInputRow) r).asMap() + ); + } else { + valueRow = (MapBasedInputRow) r; + } } catch (ClassCastException e) { parseException = new ParseException( null, - "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows" + "Unsupported input format[%s] in valueFormat. KafkaInputFormat only supports" + + " input formats that return MapBasedInputRows or ListBasedInputRows.", + r.getClass().getSimpleName() ); } if (valueRow != null) { From 79a5610263123b520ccef302acf79b2e6de5ba85 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 20 Jun 2024 19:44:20 +0530 Subject: [PATCH 2/5] Fix up buildBlendedEventMap --- .../input/kafkainput/KafkaInputReader.java | 79 ++++++------------- .../kafkainput/KafkaInputFormatTest.java | 8 +- 2 files changed, 26 insertions(+), 61 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index 0edcb231c58b..9d356d4a2e9b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -24,7 +24,6 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.ListBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.kafka.KafkaRecordEntity; @@ -32,7 +31,6 @@ import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.parsers.CloseableIterator; -import org.apache.druid.java.util.common.parsers.ParseException; import org.joda.time.DateTime; import javax.annotation.Nullable; @@ -172,31 +170,8 @@ private CloseableIterator buildBlendedRows( { return valueParser.read().map( r -> { - final MapBasedInputRow valueRow; - try { - if (r instanceof ListBasedInputRow) { - valueRow = new MapBasedInputRow( - r.getTimestamp(), - r.getDimensions(), - ((ListBasedInputRow) r).asMap() - ); - } else { - // Return type for the value parser should be of type MapBasedInputRow - // Parsers returning other types are not compatible currently. - valueRow = (MapBasedInputRow) r; - } - } - catch (ClassCastException e) { - throw new ParseException( - null, - "Unsupported input format[%s] in valueFormat. KafkaInputFormat only supports" - + " input formats that return MapBasedInputRows or ListBasedInputRows.", - r.getClass().getSimpleName() - ); - } - - final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); - final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList); newDimensions.addAll(headerKeyList.keySet()); // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); @@ -255,35 +230,18 @@ private CloseableIterator buildBlendedRowsSample( } List newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size()); List> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size()); - ParseException parseException = null; for (Map raw : rowAndValues.getRawValuesList()) { - newRawRows.add(buildBlendedEventMap(raw, headerKeyList)); + newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList)); } for (InputRow r : rowAndValues.getInputRows()) { - MapBasedInputRow valueRow = null; - try { - if (r instanceof ListBasedInputRow) { - valueRow = new MapBasedInputRow( - r.getTimestamp(), - r.getDimensions(), - ((ListBasedInputRow) r).asMap() - ); - } else { - valueRow = (MapBasedInputRow) r; - } - } - catch (ClassCastException e) { - parseException = new ParseException( - null, - "Unsupported input format[%s] in valueFormat. KafkaInputFormat only supports" - + " input formats that return MapBasedInputRows or ListBasedInputRows.", - r.getClass().getSimpleName() + if (r != null) { + final HashSet newDimensions = new HashSet<>(r.getDimensions()); + final Map event = buildBlendedEventMap( + r::getRaw, + newDimensions, + headerKeyList ); - } - if (valueRow != null) { - final Map event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList); - final HashSet newDimensions = new HashSet<>(valueRow.getDimensions()); newDimensions.addAll(headerKeyList.keySet()); // Remove the dummy timestamp added in KafkaInputFormat newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING); @@ -300,7 +258,7 @@ private CloseableIterator buildBlendedRowsSample( ); } } - return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException); + return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null); } ); } @@ -323,22 +281,31 @@ private List buildInputRowsForMap(Map headerKeyList) /** * Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read * from the first map and falling back to the second map if the value is not present. - * + *

* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening' * machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is * still in place to be lazily evaluated instead of eagerly copying. */ - private static Map buildBlendedEventMap(Map map, Map fallback) + private static Map buildBlendedEventMap( + Function getRowValue, + Set rowDimensions, + Map fallback + ) { final Set keySet = new HashSet<>(fallback.keySet()); - keySet.addAll(map.keySet()); + keySet.addAll(rowDimensions); return new AbstractMap() { @Override public Object get(Object key) { - return map.getOrDefault((String) key, fallback.get(key)); + final String skey = (String) key; + final Object val = getRowValue.apply(skey); + if (val == null) { + return fallback.get(skey); + } + return val; } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 858cd79fbd78..99a4a8981407 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -696,11 +696,9 @@ public void testMissingTimestampThrowsException() throws IOException try (CloseableIterator iterator = reader.read()) { while (iterator.hasNext()) { - Throwable t = Assert.assertThrows(ParseException.class, () -> iterator.next()); - Assert.assertEquals( - "Timestamp[null] is unparseable! Event: {kafka.newtopic.topic=sample, foo=x, kafka.newts" - + ".timestamp=1624492800000, kafka.newkey.key=sampleKey...", - t.getMessage() + Throwable t = Assert.assertThrows(ParseException.class, iterator::next); + Assert.assertTrue( + t.getMessage().startsWith("Timestamp[null] is unparseable! Event: {") ); } } From 2455f4e84cf09572345a8c66e0e3c8ffcf8c3533 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jun 2024 09:08:15 +0530 Subject: [PATCH 3/5] Add new test for KafkaInputFormat with csv value and headers --- .../kafkainput/KafkaInputFormatTest.java | 335 ++++++++---------- 1 file changed, 148 insertions(+), 187 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 99a4a8981407..b7b57b6004b8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -24,10 +24,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; @@ -57,9 +59,27 @@ public class KafkaInputFormatTest { - private KafkaRecordEntity inputEntity; - private final long timestamp = DateTimes.of("2021-06-24").getMillis(); + static { + NullHandling.initializeForTests(); + } + + private static final long TIMESTAMP_MILLIS = DateTimes.of("2021-06-24").getMillis(); private static final String TOPIC = "sample"; + private static final byte[] SIMPLE_JSON_KEY_BYTES = StringUtils.toUtf8( + "{'key': 'sampleKey'}" + .replaceAll("'", "\"") + ); + private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( + ("{" + + " 'timestamp': '2021-06-25'," + + " 'bar': null," + + " 'foo': 'x'," + + " 'baz': 4," + + " 'o': {'mg': 1}" + + "}") + .replaceAll("'", "\"") + ); + private static final Iterable

SAMPLE_HEADERS = ImmutableList.of( new Header() { @@ -177,26 +197,9 @@ public void testSerde() throws JsonProcessingException @Test public void testWithHeaderKeyAndValue() throws IOException { - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -248,21 +251,7 @@ public void testWithHeaderKeyAndValue() throws IOException Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -282,20 +271,8 @@ public void testWithHeaderKeyAndValue() throws IOException //Headers cannot be null, so testing only no key use case! public void testWithOutKey() throws IOException { - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(null, payload, headers); + KafkaRecordEntity inputEntity = makeInputEntity(null, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -338,7 +315,7 @@ public void testWithOutKey() throws IOException @Test public void testTimestampFromHeader() throws IOException { - Iterable
sample_header_with_ts = Iterables.unmodifiableIterable( + final Iterable
sampleHeaderWithTs = Iterables.unmodifiableIterable( Iterables.concat( SAMPLE_HEADERS, ImmutableList.of( @@ -359,26 +336,9 @@ public byte[] value() ) ) ); - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - - Headers headers = new RecordHeaders(sample_header_with_ts); - inputEntity = makeInputEntity(key, payload, headers); + Headers headers = new RecordHeaders(sampleHeaderWithTs); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -417,21 +377,7 @@ public byte[] value() Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - "2021-06-24", - Iterables.getOnlyElement(row.getDimension("kafka.newheader.headerTs")) - ); - Assert.assertEquals( - "2021-06-24", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -450,20 +396,9 @@ public byte[] value() @Test public void testWithOutKeyAndHeaderSpecs() throws IOException { - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-24\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(null, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(null, SIMPLE_JSON_VALUE_BYTES, headers); KafkaInputFormat localFormat = new KafkaInputFormat( null, @@ -590,7 +525,7 @@ public void testWithMultipleMixedRecords() throws IOException for (int i = 0; i < keys.length; i++) { headers = headers.add(new RecordHeader("indexH", String.valueOf(i).getBytes(StandardCharsets.UTF_8))); - inputEntity = makeInputEntity(keys[i], values[i], headers); + KafkaRecordEntity inputEntity = makeInputEntity(keys[i], values[i], headers); settableByteEntity.setEntity(inputEntity); final int numExpectedIterations = 1; @@ -612,7 +547,6 @@ public void testWithMultipleMixedRecords() throws IOException Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); Assert.assertEquals(String.valueOf(i), Iterables.getOnlyElement(row.getDimension("index"))); - // Header verification Assert.assertEquals( "application/json", @@ -652,26 +586,9 @@ public void testWithMultipleMixedRecords() throws IOException @Test public void testMissingTimestampThrowsException() throws IOException { - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -707,27 +624,9 @@ public void testMissingTimestampThrowsException() throws IOException @Test public void testWithSchemaDiscovery() throws IOException { - // testWithHeaderKeyAndValue + schemaless - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" - ); - - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" - ); - Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -775,21 +674,7 @@ public void testWithSchemaDiscovery() throws IOException Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -806,29 +691,102 @@ public void testWithSchemaDiscovery() throws IOException } @Test - public void testWithPartialDeclarationSchemaDiscovery() throws IOException + public void testValueInCsvFormat() throws IOException { - // testWithHeaderKeyAndValue + partial-schema + schema discovery - final byte[] key = StringUtils.toUtf8( - "{\n" - + " \"key\": \"sampleKey\"\n" - + "}" + format = new KafkaInputFormat( + new KafkaStringHeaderFormat(null), + // Key Format + new JsonInputFormat( + new JSONPathSpec(true, ImmutableList.of()), + null, + null, + false, + false + ), + // Value Format + new CsvInputFormat( + Arrays.asList("foo", "bar", "timestamp", "baz"), + null, + false, + false, + 0 + ), + "kafka.newheader.", + "kafka.newkey.key", + "kafka.newts.timestamp", + "kafka.newtopic.topic" ); - final byte[] payload = StringUtils.toUtf8( - "{\n" - + " \"timestamp\": \"2021-06-25\",\n" - + " \"bar\": null,\n" - + " \"foo\": \"x\",\n" - + " \"baz\": 4,\n" - + " \"o\": {\n" - + " \"mg\": 1\n" - + " }\n" - + "}" + Headers headers = new RecordHeaders(SAMPLE_HEADERS); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, StringUtils.toUtf8("x,,2021-06-25,4"), headers); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + DimensionsSpec.getDefaultSchemas( + ImmutableList.of( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp", + "kafka.newtopic.topic" + ) + ) + ), + ColumnsFilter.all() + ), + newSettableByteEntity(inputEntity), + null ); + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + Assert.assertEquals( + Arrays.asList( + "bar", + "foo", + "kafka.newheader.encoding", + "kafka.newheader.kafkapkc", + "kafka.newts.timestamp", + "kafka.newtopic.topic" + ), + row.getDimensions() + ); + // Payload verifications + // this isn't super realistic, since most of these columns are not actually defined in the dimensionSpec + // but test reading them anyway since it isn't technically illegal + + Assert.assertEquals(DateTimes.of("2021-06-25"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertTrue(row.getDimension("bar").isEmpty()); + + verifyHeader(row); + + // Key verification + Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testWithPartialDeclarationSchemaDiscovery() throws IOException + { + // testWithHeaderKeyAndValue + partial-schema + schema discovery Headers headers = new RecordHeaders(SAMPLE_HEADERS); - inputEntity = makeInputEntity(key, payload, headers); + KafkaRecordEntity inputEntity = + makeInputEntity(SIMPLE_JSON_KEY_BYTES, SIMPLE_JSON_VALUE_BYTES, headers); final InputEntityReader reader = format.createReader( new InputRowSchema( @@ -879,21 +837,7 @@ public void testWithPartialDeclarationSchemaDiscovery() throws IOException Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); Assert.assertEquals(ImmutableMap.of("mg", 1L), row.getRaw("o")); - // Header verification - Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); - Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); - Assert.assertEquals( - String.valueOf(DateTimes.of("2021-06-24").getMillis()), - Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) - ); - Assert.assertEquals( - TOPIC, - Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) - ); - Assert.assertEquals( - "2021-06-25", - Iterables.getOnlyElement(row.getDimension("timestamp")) - ); + verifyHeader(row); // Key verification Assert.assertEquals("sampleKey", Iterables.getOnlyElement(row.getDimension("kafka.newkey.key"))); @@ -916,7 +860,7 @@ private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers he TOPIC, 0, 0, - timestamp, + TIMESTAMP_MILLIS, null, 0, 0, @@ -928,6 +872,23 @@ private KafkaRecordEntity makeInputEntity(byte[] key, byte[] payload, Headers he ); } + private void verifyHeader(InputRow row) + { + Assert.assertEquals("application/json", Iterables.getOnlyElement(row.getDimension("kafka.newheader.encoding"))); + Assert.assertEquals("pkc-bar", Iterables.getOnlyElement(row.getDimension("kafka.newheader.kafkapkc"))); + Assert.assertEquals( + String.valueOf(DateTimes.of("2021-06-24").getMillis()), + Iterables.getOnlyElement(row.getDimension("kafka.newts.timestamp")) + ); + Assert.assertEquals( + TOPIC, + Iterables.getOnlyElement(row.getDimension("kafka.newtopic.topic")) + ); + Assert.assertEquals( + "2021-06-25", + Iterables.getOnlyElement(row.getDimension("timestamp")) + ); + } private SettableByteEntity newSettableByteEntity(KafkaRecordEntity kafkaRecordEntity) { From 9ad23c0cb06208a74707ade187a618b7fedab772 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Fri, 21 Jun 2024 13:14:19 +0530 Subject: [PATCH 4/5] Do not use forbidden APIs --- .../kafkainput/KafkaInputFormatTest.java | 22 ++++++++++--------- .../druid/java/util/common/StringUtils.java | 17 ++++++++++++++ .../druid/common/utils/StringUtilsTest.java | 9 ++++++++ 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index b7b57b6004b8..7f1a532a0f64 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -66,18 +66,20 @@ public class KafkaInputFormatTest private static final long TIMESTAMP_MILLIS = DateTimes.of("2021-06-24").getMillis(); private static final String TOPIC = "sample"; private static final byte[] SIMPLE_JSON_KEY_BYTES = StringUtils.toUtf8( - "{'key': 'sampleKey'}" - .replaceAll("'", "\"") + StringUtils.singleQuoteToStandardJson( + "{'key': 'sampleKey'}" + ) ); private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( - ("{" - + " 'timestamp': '2021-06-25'," - + " 'bar': null," - + " 'foo': 'x'," - + " 'baz': 4," - + " 'o': {'mg': 1}" - + "}") - .replaceAll("'", "\"") + StringUtils.singleQuoteToStandardJson( + "{" + + " 'timestamp': '2021-06-25'," + + " 'bar': null," + + " 'foo': 'x'," + + " 'baz': 4," + + " 'o': {'mg': 1}" + + "}" + ) ); private static final Iterable
SAMPLE_HEADERS = ImmutableList.of( diff --git a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java index e5d9b2c8e4ec..b4f3113689dd 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -349,6 +349,23 @@ public static byte[] toUtf8Nullable(@Nullable final String string) return toUtf8(string); } + /** + * Converts the given JSON string which uses single quotes for field names and + * String values to a standard JSON by replacing all occurrences of a single + * quote with double quotes. + *

+ * Single-quoted JSON is typically easier to read as can be seen below: + *

+   * final String singleQuotedJson = "{'f1':'value', 'f2':5}";
+   *
+   * final String doubleQuotedJson = "{\"f1\":\"value\", \"f2\":5}";
+   * 
+ */ + public static String singleQuoteToStandardJson(String singleQuotedJson) + { + return replaceChar(singleQuotedJson, '\'', "\""); + } + /** * Equivalent of String.format(Locale.ENGLISH, message, formatArgs). */ diff --git a/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java b/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java index 787d5daa79da..72a084e0a082 100644 --- a/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java @@ -87,4 +87,13 @@ public void testToUtf8ByteBuffer() Assert.assertEquals(ByteBuffer.wrap(StringUtils.toUtf8("foo")), StringUtils.toUtf8ByteBuffer("foo")); Assert.assertEquals(ByteBuffer.wrap(StringUtils.toUtf8("🙂")), StringUtils.toUtf8ByteBuffer("🙂")); } + + @Test + public void testSingleQuoteToStandardJson() + { + Assert.assertEquals( + "{\"stringField\":\"stringValue\", \"numberField\":5}", + StringUtils.singleQuoteToStandardJson("{'stringField':'stringValue', 'numberField':5}") + ); + } } From 12160dedf8477b04b15f9d50d940d52026ccb130 Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Tue, 25 Jun 2024 07:46:35 +0530 Subject: [PATCH 5/5] Move utility method to TestUtils --- .../input/kafkainput/KafkaInputFormatTest.java | 5 +++-- .../druid/indexing/common/TestUtils.java | 18 ++++++++++++++++++ .../druid/java/util/common/StringUtils.java | 17 ----------------- .../druid/common/utils/StringUtilsTest.java | 9 --------- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 7f1a532a0f64..adbb7c4b6779 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.kafka.KafkaRecordEntity; +import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.seekablestream.SettableByteEntity; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; @@ -66,12 +67,12 @@ public class KafkaInputFormatTest private static final long TIMESTAMP_MILLIS = DateTimes.of("2021-06-24").getMillis(); private static final String TOPIC = "sample"; private static final byte[] SIMPLE_JSON_KEY_BYTES = StringUtils.toUtf8( - StringUtils.singleQuoteToStandardJson( + TestUtils.singleQuoteToStandardJson( "{'key': 'sampleKey'}" ) ); private static final byte[] SIMPLE_JSON_VALUE_BYTES = StringUtils.toUtf8( - StringUtils.singleQuoteToStandardJson( + TestUtils.singleQuoteToStandardJson( "{" + " 'timestamp': '2021-06-25'," + " 'bar': null," diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java index bac4261e0fa5..7dda3b8ff62c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java @@ -36,6 +36,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTaskClientProvider; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable; @@ -174,4 +175,21 @@ public static boolean conditionValid(IndexingServiceCondition condition, long ti } return true; } + + /** + * Converts the given JSON string which uses single quotes for field names and + * String values to a standard JSON by replacing all occurrences of a single + * quote with double quotes. + *

+ * Single-quoted JSON is typically easier to read as can be seen below: + *

+   * final String singleQuotedJson = "{'f1':'value', 'f2':5}";
+   *
+   * final String doubleQuotedJson = "{\"f1\":\"value\", \"f2\":5}";
+   * 
+ */ + public static String singleQuoteToStandardJson(String singleQuotedJson) + { + return StringUtils.replaceChar(singleQuotedJson, '\'', "\""); + } } diff --git a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java index b4f3113689dd..e5d9b2c8e4ec 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -349,23 +349,6 @@ public static byte[] toUtf8Nullable(@Nullable final String string) return toUtf8(string); } - /** - * Converts the given JSON string which uses single quotes for field names and - * String values to a standard JSON by replacing all occurrences of a single - * quote with double quotes. - *

- * Single-quoted JSON is typically easier to read as can be seen below: - *

-   * final String singleQuotedJson = "{'f1':'value', 'f2':5}";
-   *
-   * final String doubleQuotedJson = "{\"f1\":\"value\", \"f2\":5}";
-   * 
- */ - public static String singleQuoteToStandardJson(String singleQuotedJson) - { - return replaceChar(singleQuotedJson, '\'', "\""); - } - /** * Equivalent of String.format(Locale.ENGLISH, message, formatArgs). */ diff --git a/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java b/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java index 72a084e0a082..787d5daa79da 100644 --- a/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java +++ b/processing/src/test/java/org/apache/druid/common/utils/StringUtilsTest.java @@ -87,13 +87,4 @@ public void testToUtf8ByteBuffer() Assert.assertEquals(ByteBuffer.wrap(StringUtils.toUtf8("foo")), StringUtils.toUtf8ByteBuffer("foo")); Assert.assertEquals(ByteBuffer.wrap(StringUtils.toUtf8("🙂")), StringUtils.toUtf8ByteBuffer("🙂")); } - - @Test - public void testSingleQuoteToStandardJson() - { - Assert.assertEquals( - "{\"stringField\":\"stringValue\", \"numberField\":5}", - StringUtils.singleQuoteToStandardJson("{'stringField':'stringValue', 'numberField':5}") - ); - } }