From dabdb832cf8acd7ea475705616fd50d1ab8212a6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 14 Sep 2022 04:26:53 -0500 Subject: [PATCH 1/3] Add JsonInputFormat option to assume newline delimited JSON, improve handling for non-NDJSON --- .../data/input/impl/JsonInputFormat.java | 61 ++- .../druid/data/input/impl/JsonNodeReader.java | 182 +++++++ .../impl/CloudObjectInputSourceTest.java | 8 +- .../data/input/impl/JsonInputFormatTest.java | 12 +- .../data/input/impl/JsonLineReaderTest.java | 16 +- .../data/input/impl/JsonNodeReaderTest.java | 475 ++++++++++++++++++ .../druid/data/input/impl/JsonReaderTest.java | 28 +- docs/ingestion/data-formats.md | 7 + .../data/input/aliyun/OssInputSourceTest.java | 10 +- .../GoogleCloudStorageInputSourceTest.java | 8 +- .../kafkainput/KafkaInputFormatTest.java | 15 +- .../indexing/kafka/KafkaSamplerSpecTest.java | 4 +- .../kafka/supervisor/KafkaSupervisorTest.java | 2 + .../kinesis/KinesisSamplerSpecTest.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 3 + .../druid/msq/querykit/DataSourcePlan.java | 2 +- .../apache/druid/msq/exec/MSQSelectTest.java | 2 +- .../msq/indexing/error/MSQWarningsTest.java | 2 +- .../external/ExternalInputSpecSlicerTest.java | 2 +- .../data/input/s3/S3InputSourceTest.java | 18 +- .../indexing/common/task/IndexTaskTest.java | 2 +- ...aseParallelIndexingWithNullColumnTest.java | 6 +- .../ParallelIndexSupervisorTaskTest.java | 2 +- .../parallel/ParallelIndexTestingFactory.java | 2 +- .../PartialHashSegmentGenerateTaskTest.java | 4 +- .../SinglePhaseParallelIndexingTest.java | 2 + .../parallel/SinglePhaseSubTaskSpecTest.java | 2 +- .../sampler/InputSourceSamplerTest.java | 2 +- .../SeekableStreamIndexTaskTestBase.java | 2 + .../SeekableStreamSupervisorSpecTest.java | 6 +- .../seekablestream/StreamChunkParserTest.java | 6 +- .../SeekableStreamSupervisorStateTest.java | 4 +- 32 files changed, 828 insertions(+), 71 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 5e18ddf58ac2..233d205fe4e3 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -55,21 +55,42 @@ public class JsonInputFormat extends NestedInputFormat */ private final boolean lineSplittable; + /** + * If true, overrides the behavior controlled by lineSplittable and the parsing will assume that the input + * is newline delimited. This is to allow streaming ingestion to use the newline delimited parser when the + * input is known to follow that format, since this allows for more flexible handling of parse errors (i.e., + * an invalid JSON event on one line will not prevent other events on different lines from being ingested). + */ + private final boolean assumeNewlineDelimited; + + /** + * If true, use the JsonNodeReader when parsing non-newline delimited JSON. This parser splits the input string + * into JsonNode objects, parsing each JsonNode into a separate InputRow, instead of parsing the input string + * into several InputRow at the same time. This allows for more flexible handling of parse errors, where timestamp + * parsing errors do not prevent other events in the same string from being ingested, and allows valid events prior to + * encountering invalid JSON syntax to be ingested as well. + */ + private final boolean useJsonNodeReader; + @JsonCreator public JsonInputFormat( @JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec, @JsonProperty("featureSpec") @Nullable Map featureSpec, - @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns + @JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns, + @JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited, + @JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader ) { - this(flattenSpec, featureSpec, keepNullColumns, true); + this(flattenSpec, featureSpec, keepNullColumns, true, assumeNewlineDelimited, useJsonNodeReader); } public JsonInputFormat( @Nullable JSONPathSpec flattenSpec, Map featureSpec, Boolean keepNullColumns, - boolean lineSplittable + boolean lineSplittable, + Boolean assumeNewlineDelimited, + Boolean useJsonNodeReader ) { super(flattenSpec); @@ -85,6 +106,8 @@ public JsonInputFormat( objectMapper.configure(feature, entry.getValue()); } this.lineSplittable = lineSplittable; + this.assumeNewlineDelimited = assumeNewlineDelimited != null && assumeNewlineDelimited; + this.useJsonNodeReader = useJsonNodeReader != null && useJsonNodeReader; } @JsonProperty @@ -109,9 +132,13 @@ public boolean isSplittable() @Override public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory) { - return this.lineSplittable ? - new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) : - new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + if (this.lineSplittable || this.assumeNewlineDelimited) { + return new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } else if (this.useJsonNodeReader) { + return new JsonNodeReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } else { + return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns); + } } /** @@ -124,7 +151,10 @@ public JsonInputFormat withLineSplittable(boolean lineSplittable) return new JsonInputFormat(this.getFlattenSpec(), this.getFeatureSpec(), this.keepNullColumns, - lineSplittable); + lineSplittable, + assumeNewlineDelimited, + useJsonNodeReader + ); } @Override @@ -140,12 +170,25 @@ public boolean equals(Object o) return false; } JsonInputFormat that = (JsonInputFormat) o; - return this.lineSplittable == that.lineSplittable && Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns); + return keepNullColumns == that.keepNullColumns + && lineSplittable == that.lineSplittable + && assumeNewlineDelimited == that.assumeNewlineDelimited + && useJsonNodeReader == that.useJsonNodeReader + && Objects.equals(featureSpec, that.featureSpec) + && Objects.equals(objectMapper, that.objectMapper); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), featureSpec, keepNullColumns, lineSplittable); + return Objects.hash( + super.hashCode(), + featureSpec, + objectMapper, + keepNullColumns, + lineSplittable, + assumeNewlineDelimited, + useJsonNodeReader + ); } } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java new file mode 100644 index 000000000000..995172c52eee --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.IntermediateRowParsingReader; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ObjectFlattener; +import org.apache.druid.java.util.common.parsers.ObjectFlatteners; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.apache.druid.utils.CollectionUtils; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * In contrast to {@link JsonLineReader} which processes input text line by line independently, + * this class tries to split the input into a list of JsonNode objects, and then parses each JsonNode independently + * into an InputRow. + * + *

+ * The input text can be: + * 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text) + * 2. multiple JSON object strings concated by white space character(s) + *

+ * If an input string contains invalid JSON syntax, any valid JSON objects found prior to encountering the invalid + * syntax will be successfully parsed, but parsing will not continue after the invalid syntax. + *

+ */ +public class JsonNodeReader extends IntermediateRowParsingReader +{ + private final ObjectFlattener flattener; + private final ObjectMapper mapper; + private final InputEntity source; + private final InputRowSchema inputRowSchema; + private final JsonFactory jsonFactory; + + JsonNodeReader( + InputRowSchema inputRowSchema, + InputEntity source, + JSONPathSpec flattenSpec, + ObjectMapper mapper, + boolean keepNullColumns + ) + { + this.inputRowSchema = inputRowSchema; + this.source = source; + this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns)); + this.mapper = mapper; + this.jsonFactory = new JsonFactory(); + } + + @Override + protected CloseableIterator intermediateRowIterator() throws IOException + { + final String sourceString = IOUtils.toString(source.open(), StringUtils.UTF8_STRING); + final List jsonNodes = new ArrayList<>(); + try { + JsonParser parser = jsonFactory.createParser(sourceString); + final MappingIterator delegate = mapper.readValues(parser, JsonNode.class); + while (delegate.hasNext()) { + jsonNodes.add(delegate.next()); + } + } + catch (Exception e) { + //convert Jackson's JsonParseException into druid's exception for further processing + //JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed + if (e.getCause() instanceof JsonParseException) { + jsonNodes.add( + new ParseExceptionMarkerJsonNode( + new ParseException(sourceString, e, "Unable to parse row [%s]", sourceString) + ) + ); + } else { + throw e; + } + } + + if (CollectionUtils.isNullOrEmpty(jsonNodes)) { + jsonNodes.add( + new ParseExceptionMarkerJsonNode( + new ParseException( + sourceString, + "Unable to parse [%s] as the intermediateRow resulted in empty input row", + sourceString + ) + ) + ); + } + return CloseableIterators.withEmptyBaggage(jsonNodes.iterator()); + } + + @Override + protected InputEntity source() + { + return source; + } + + @Override + protected List parseInputRows(JsonNode intermediateRow) throws IOException, ParseException + { + if (intermediateRow instanceof ParseExceptionMarkerJsonNode) { + throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException(); + } + final List inputRows = Collections.singletonList( + MapInputRowParser.parse(inputRowSchema, flattener.flatten(intermediateRow)) + ); + + if (CollectionUtils.isNullOrEmpty(inputRows)) { + throw new ParseException( + intermediateRow.toString(), + "Unable to parse [%s] as the intermediateRow resulted in empty input row", + intermediateRow.toString() + ); + } + return inputRows; + } + + @Override + protected List> toMap(JsonNode intermediateRow) throws IOException + { + if (intermediateRow instanceof ParseExceptionMarkerJsonNode) { + throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException(); + } + return Collections.singletonList( + mapper.readValue(intermediateRow.toString(), new TypeReference>() + { + }) + ); + } + + private static class ParseExceptionMarkerJsonNode extends ObjectNode + { + final ParseException parseException; + + public ParseExceptionMarkerJsonNode(ParseException pe) + { + super(null); + this.parseException = pe; + } + + public ParseException getParseException() + { + return parseException; + } + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java index 70421c6afaf0..7555e6afa59c 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectInputSourceTest.java @@ -131,7 +131,7 @@ public void testWithUrisFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -152,7 +152,7 @@ public void testWithUris() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -173,7 +173,7 @@ public void testWithObjectsFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -194,7 +194,7 @@ public void testWithObjects() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java index 0577dd72fb46..d7afec9dffb4 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java @@ -52,7 +52,9 @@ public void testSerde() throws IOException ) ), ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false), - true + true, + false, + false ); final byte[] bytes = mapper.writeValueAsBytes(format); final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class); @@ -79,6 +81,8 @@ public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault() final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(false, null), null, + null, + null, null ); Assert.assertFalse(format.isKeepNullColumns()); @@ -90,6 +94,8 @@ public void testUseFieldDiscovery_setKeepNullColumnsByDefault() final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ); Assert.assertTrue(format.isKeepNullColumns()); @@ -101,7 +107,9 @@ public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets() final JsonInputFormat format = new JsonInputFormat( new JSONPathSpec(true, null), null, - false + false, + null, + null ); Assert.assertFalse(format.isKeepNullColumns()); } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java index c61ad4921bb7..18c00578e292 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonLineReaderTest.java @@ -56,6 +56,8 @@ public void testParseRow() throws IOException ) ), null, + null, + null, null ); @@ -106,6 +108,8 @@ public void testParseRowWithConditional() throws IOException ) ), null, + null, + null, null ); @@ -148,7 +152,9 @@ public void testParseRowKeepNullColumns() throws IOException ) ), null, - true + true, + null, + null ); final ByteEntity source = new ByteEntity( @@ -190,7 +196,9 @@ public void testKeepNullColumnsWithNoNullValues() throws IOException ) ), null, - true + true, + null, + null ); final ByteEntity source = new ByteEntity( @@ -232,7 +240,9 @@ public void testFalseKeepNullColumns() throws IOException ) ), null, - false + false, + null, + null ); final ByteEntity source = new ByteEntity( diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java new file mode 100644 index 000000000000..50c372ff3d6f --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonNodeReaderTest.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputEntityReader; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathFieldSpec; +import org.apache.druid.java.util.common.parsers.JSONPathFieldType; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.common.parsers.ParseException; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; + +public class JsonNodeReaderTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testParseMultipleRows() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used, + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + final int numExpectedIterations = 3; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + final String msgId = String.valueOf(++numActualIterations); + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testParsePrettyFormatJSON() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\n" + + " \"timestamp\": \"2019-01-01\",\n" + + " \"bar\": null,\n" + + " \"foo\": \"x\",\n" + + " \"baz\": 4,\n" + + " \"o\": {\n" + + " \"mg\": 1\n" + + " }\n" + + "}") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + final int numExpectedIterations = 1; + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + + final InputRow row = iterator.next(); + + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals("1", Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + numActualIterations++; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testInvalidJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}" + //baz property is illegal + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + //expect a ParseException on the following `next` call on iterator + expectedException.expect(ParseException.class); + + // the 2nd line is ill-formed, so the parse of this text chunk aborts + final int numExpectedIterations = 0; + + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + iterator.next(); + ++numActualIterations; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testSampleMultipleRows() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + int acturalRowCount = 0; + try (CloseableIterator iterator = reader.sample()) { + while (iterator.hasNext()) { + + final InputRowListPlusRawValues rawValues = iterator.next(); + + // 1 row returned 3 times + Assert.assertEquals(1, rawValues.getInputRows().size()); + InputRow row = rawValues.getInputRows().get(0); + + final String msgId = String.valueOf(++acturalRowCount); + Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp()); + Assert.assertEquals("x", Iterables.getOnlyElement(row.getDimension("foo"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("baz"))); + Assert.assertEquals("4", Iterables.getOnlyElement(row.getDimension("root_baz"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("path_omg"))); + Assert.assertEquals(msgId, Iterables.getOnlyElement(row.getDimension("jq_omg"))); + + Assert.assertTrue(row.getDimension("root_baz2").isEmpty()); + Assert.assertTrue(row.getDimension("path_omg2").isEmpty()); + Assert.assertTrue(row.getDimension("jq_omg2").isEmpty()); + + } + } + + Assert.assertEquals(3, acturalRowCount); + } + + @Test + public void testSamplInvalidJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //2nd row is has an invalid timestamp which causes a parse exception, but is valid JSON + //3rd row is malformed json and terminates the row iteration + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}" + + "{\"timestamp\":\"invalidtimestamp\",\"bar\":null,\"foo\":\"x\",\"baz\":5,\"o\":{\"mg\":2}}\n" + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}\n" + //value of baz is invalid + + "{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}\n") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + // the invalid timestamp in line 2 causes a parse exception, but because it is valid JSON, parsing can continue + // the invalid character in line 3 stops parsing of the 4-line text as a whole + // so the total num of iteration is 3 + final int numExpectedIterations = 3; + + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + numActualIterations++; + + final InputRowListPlusRawValues rawValues = iterator.next(); + + if (numActualIterations == 2 || numActualIterations == 3) { + Assert.assertNotNull(rawValues.getParseException()); + } + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + @Test + public void testEmptyJSONText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //input is empty + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8( + "" // empty row + ) + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + //expect a ParseException on the following `next` call on iterator + expectedException.expect(ParseException.class); + + // the 2nd line is ill-formed, so the parse of this text chunk aborts + final int numExpectedIterations = 0; + + try (CloseableIterator iterator = reader.read()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + iterator.next(); + ++numActualIterations; + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } + + + + @Test + public void testSampleEmptyText() throws IOException + { + final JsonInputFormat format = new JsonInputFormat( + new JSONPathSpec( + true, + ImmutableList.of( + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"), + new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"), + new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") + ) + ), + null, + null, + false, //make sure JsonReader is used + false, + true + ); + + //input is empty + final ByteEntity source = new ByteEntity( + StringUtils.toUtf8("") + ); + + final InputEntityReader reader = format.createReader( + new InputRowSchema( + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))), + ColumnsFilter.all() + ), + source, + null + ); + + // the total num of iteration is 1 + final int numExpectedIterations = 1; + + try (CloseableIterator iterator = reader.sample()) { + int numActualIterations = 0; + while (iterator.hasNext()) { + numActualIterations++; + + final InputRowListPlusRawValues rawValues = iterator.next(); + + Assert.assertNotNull(rawValues.getParseException()); + } + + Assert.assertEquals(numExpectedIterations, numActualIterations); + } + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java index 7ab52a095d51..3374844ecfc2 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java @@ -62,7 +62,9 @@ public void testParseMultipleRows() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -122,7 +124,9 @@ public void testParsePrettyFormatJSON() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -189,7 +193,9 @@ public void testInvalidJSONText() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -243,7 +249,9 @@ public void testSampleMultipleRows() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); final ByteEntity source = new ByteEntity( @@ -309,7 +317,9 @@ public void testSamplInvalidJSONText() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //2nd row is ill-formed @@ -365,7 +375,9 @@ public void testEmptyJSONText() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //input is empty @@ -421,7 +433,9 @@ public void testSampleEmptyText() throws IOException ), null, null, - false //make sure JsonReader is used + false, //make sure JsonReader is used + false, + false ); //input is empty diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index bb1b659c49bd..0b696fe2e1e4 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -91,6 +91,13 @@ Configure the JSON `inputFormat` to load JSON data as follows: | flattenSpec | JSON Object | Specifies flattening configuration for nested JSON data. See [`flattenSpec`](#flattenspec) for more info. | no | | featureSpec | JSON Object | [JSON parser features](https://github.com/FasterXML/jackson-core/wiki/JsonParser-Features) supported by Jackson, a JSON processor for Java. The features control parsing of the input JSON data. To enable a feature, map the feature name to a Boolean value of "true". For example: `"featureSpec": {"ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true}` | no | +The following properties are specialized properties that only apply when the JSON `inputFormat` is used in streaming ingestion, and they are related to how parsing exceptions are handled. In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON event spans multiple lines). However, if a parsing exception occurs, all JSON events that are present in the same streaming record will be discarded. + +| Field | Type | Description | Required | +|-------|------|-------------|----------| +| assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) | +| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occured. | no (Default false) | + For example: ```json "ioConfig": { diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java index 6a677325a334..817167df38a9 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java +++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java @@ -327,7 +327,7 @@ public void testWithUrisSplit() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -353,7 +353,7 @@ public void testWithPrefixesSplit() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -380,7 +380,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); @@ -410,7 +410,7 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals( @@ -444,7 +444,7 @@ public void testAccessDeniedWhileListingPrefix() ); inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ).collect(Collectors.toList()); } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index dba662a91199..caebb4229e86 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -159,7 +159,7 @@ public void testWithUrisSplit() new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); @@ -178,7 +178,7 @@ public void testWithUrisFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); @@ -229,7 +229,7 @@ public void testWithPrefixesSplit() throws IOException new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -251,7 +251,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() throws IOException new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index 86194d8b098e..fe0b89e996f8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -91,7 +91,8 @@ public void setUp() // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), // Value Format new JsonInputFormat( @@ -106,7 +107,8 @@ public void setUp() new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" ); @@ -121,7 +123,8 @@ public void testSerde() throws JsonProcessingException // Key Format new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), // Value Format new JsonInputFormat( @@ -136,7 +139,8 @@ public void testSerde() throws JsonProcessingException new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.key", "kafka.newts.timestamp" ); @@ -407,7 +411,8 @@ public void testWithOutKeyAndHeaderSpecs() throws IOException new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2") ) ), - null, null, false //make sure JsonReader is used + null, null, false, //make sure JsonReader is used + false, false ), "kafka.newheader.", "kafka.newkey.", "kafka.newts." ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index bf5831ccaef7..4873949ecefe 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -130,7 +130,7 @@ public void testSample() null, new KafkaSupervisorIOConfig( TOPIC, - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, null, @@ -302,7 +302,7 @@ public void testInvalidKafkaConfig() null, new KafkaSupervisorIOConfig( TOPIC, - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index e6a3a919b421..a5d1b77458d4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -133,6 +133,8 @@ public class KafkaSupervisorTest extends EasyMockSupport private static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + false, + false, false ); private static final String TOPIC_PREFIX = "testTopic"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 9a446f5265e8..9e7d76edddbd 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -135,7 +135,7 @@ public void testSample() throws Exception null, new KinesisSupervisorIOConfig( STREAM, - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), null, null, null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 42866c23107a..99d8d5223b7b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -122,6 +122,9 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + false, + false, + false, false ); private static final String DATASOURCE = "testDS"; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java index 2bd59a7f9cdf..fc715d3544b3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java @@ -229,7 +229,7 @@ private static DataSourcePlan forInline( return forExternal( new ExternalDataSource( dataString.isEmpty() ? NilInputSource.instance() : new InlineInputSource(dataString), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), signature ), broadcast diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java index 10c67e27a28b..6b008952d5ed 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQSelectTest.java @@ -625,7 +625,7 @@ public void testExternSelect1() throws IOException .setDataSource( new ExternalDataSource( new LocalInputSource(null, null, ImmutableList.of(toRead.getAbsoluteFile())), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), RowSignature.builder() .add("timestamp", ColumnType.STRING) .add("page", ColumnType.STRING) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java index e25867b059b6..c41786ce3f21 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/error/MSQWarningsTest.java @@ -82,7 +82,7 @@ public void setUp3() throws IOException toRead.getAbsoluteFile() ) ), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), RowSignature.builder() .add("timestamp", ColumnType.STRING) .add("page", ColumnType.STRING) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java index 186cb0b9082f..23728beb44db 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/external/ExternalInputSpecSlicerTest.java @@ -49,7 +49,7 @@ public class ExternalInputSpecSlicerTest { - static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null); + static final InputFormat INPUT_FORMAT = new JsonInputFormat(null, null, null, null, null); static final RowSignature SIGNATURE = RowSignature.builder().add("s", ColumnType.STRING).build(); private ExternalInputSpecSlicer slicer; diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index afca41d61f7e..0836f5a5daa6 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -663,7 +663,7 @@ public void testWithUrisSplit() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -688,7 +688,7 @@ public void testWithUrisFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -713,7 +713,7 @@ public void testWithObjectsFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -738,7 +738,7 @@ public void testWithoutObjectsFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); @@ -768,7 +768,7 @@ public void testWithPrefixesSplit() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -799,7 +799,7 @@ public void testGetPrefixesSplitStreamWithFilter() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(null, 1) ); @@ -830,7 +830,7 @@ public void testCreateSplitsWithSplitHintSpecRespectingHint() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new MaxSizeSplitHintSpec(new HumanReadableBytes(CONTENT.length * 3L), null) ); @@ -864,7 +864,7 @@ public void testCreateSplitsWithEmptyObjectsIteratingOnlyNonEmptyObjects() ); Stream>> splits = inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ); Assert.assertEquals( @@ -902,7 +902,7 @@ public void testAccessDeniedWhileListingPrefix() ); inputSource.createSplits( - new JsonInputFormat(JSONPathSpec.DEFAULT, null, null), + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null ).collect(Collectors.toList()); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 125e8f1be801..095630a658cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1604,7 +1604,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception tmpDir, timestampSpec, dimensionsSpec, - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), null, null, tuningConfig, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java index 2253d9ac334b..22a006bd6c29 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/MultiPhaseParallelIndexingWithNullColumnTest.java @@ -80,7 +80,7 @@ public class MultiPhaseParallelIndexingWithNullColumnTest extends AbstractMultiP private static final DimensionsSpec DIMENSIONS_SPEC = new DimensionsSpec( DimensionsSpec.getDefaultSchemas(Arrays.asList("ts", "dim1", "dim2")) ); - private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null); + private static final InputFormat JSON_FORMAT = new JsonInputFormat(null, null, null, null, null); private static final List INTERVAL_TO_INDEX = Collections.singletonList(Intervals.of("2022-01/P1M")); @Parameterized.Parameters @@ -194,6 +194,8 @@ public void testIngestNullColumn_useFieldDiscovery_includeAllDimensions_shouldSt new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ), false, @@ -259,6 +261,8 @@ public void testIngestNullColumn_explicitPathSpec_useFieldDiscovery_includeAllDi ) ), null, + null, + null, null ), false, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 212914556560..c91c9772e120 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -227,7 +227,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA final ParallelIndexIOConfig ioConfig = new ParallelIndexIOConfig( null, new InlineInputSource("test"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), appendToExisting, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 150d0b2c1802..7f315ab33043 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -289,6 +289,6 @@ static String createRowFromMap(long timestamp, Map fields) static InputFormat getInputFormat() { - return new JsonInputFormat(null, null, null); + return new JsonInputFormat(null, null, null, null, null); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java index 48abc95e08dc..749921fff3cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTaskTest.java @@ -47,7 +47,7 @@ public class PartialHashSegmentGenerateTaskTest private static final ObjectMapper OBJECT_MAPPER = ParallelIndexTestingFactory.createObjectMapper(); private static final ParallelIndexIngestionSpec INGESTION_SPEC = ParallelIndexTestingFactory.createIngestionSpec( new LocalInputSource(new File("baseDir"), "filer"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), new ParallelIndexTestingFactory.TuningConfigBuilder().build(), ParallelIndexTestingFactory.createDataSchema(ParallelIndexTestingFactory.INPUT_INTERVALS) ); @@ -161,7 +161,7 @@ public void requiresGranularitySpecInputIntervals() ParallelIndexTestingFactory.NUM_ATTEMPTS, ParallelIndexTestingFactory.createIngestionSpec( new LocalInputSource(new File("baseDir"), "filer"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), new ParallelIndexTestingFactory.TuningConfigBuilder().build(), ParallelIndexTestingFactory.createDataSchema(null) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 031dd68d4cd0..e1907c9dcab1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -851,6 +851,8 @@ public void testIngestBothExplicitAndImplicitDims() throws IOException new JsonInputFormat( new JSONPathSpec(true, null), null, + null, + null, null ), false, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java index c28d43c3858b..d42f7e69200f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java @@ -54,7 +54,7 @@ public class SinglePhaseSubTaskSpecTest new ParallelIndexIOConfig( null, new LocalInputSource(new File("baseDir"), "filter"), - new JsonInputFormat(null, null, null), + new JsonInputFormat(null, null, null, null, null), null, null ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java index 50e2f63c0f86..af612572b5bd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java @@ -1463,7 +1463,7 @@ private InputFormat createInputFormat() { switch (parserType) { case STR_JSON: - return new JsonInputFormat(null, null, null); + return new JsonInputFormat(null, null, null, null, null); case STR_CSV: return new CsvInputFormat(ImmutableList.of("t", "dim1", "dim2", "met1"), null, null, false, 0); default: diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 92bb04d6cb31..e93510ad7400 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -128,6 +128,8 @@ public class SeekableStreamIndexTaskTestBase extends EasyMockSupport protected static final InputFormat INPUT_FORMAT = new JsonInputFormat( new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), + null, + null, null ); protected static final Logger LOG = new Logger(SeekableStreamIndexTaskTestBase.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 7cdbb7481af8..7116b8813743 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -856,7 +856,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte { SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), @@ -927,7 +927,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal if (scaleOut) { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, taskCount, new Period("PT1H"), @@ -944,7 +944,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal } else { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, taskCount, new Period("PT1H"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java index 1cab704a2eff..f3485e06f48a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java @@ -106,7 +106,7 @@ public void testWithParserAndNullInputformatParseProperly() throws IOException @Test public void testWithNullParserAndInputformatParseProperly() throws IOException { - final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null); + final JsonInputFormat inputFormat = new JsonInputFormat(JSONPathSpec.DEFAULT, Collections.emptyMap(), null, null, null); final StreamChunkParser chunkParser = new StreamChunkParser<>( null, inputFormat, @@ -237,7 +237,7 @@ static class Props private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, @Nullable Map featureSpec) { - super(flattenSpec, featureSpec, null); + super(flattenSpec, featureSpec, null, null, null); props = new Props(); } @@ -246,7 +246,7 @@ private TrackingJsonInputFormat(@Nullable JSONPathSpec flattenSpec, boolean lineSplittable, Props props) { - super(flattenSpec, featureSpec, null, lineSplittable); + super(flattenSpec, featureSpec, null, lineSplittable, null, null); this.props = props; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 82f3b58631fa..a84001dfc0ef 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -851,7 +851,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), @@ -909,7 +909,7 @@ private static SeekableStreamSupervisorIOConfig getIOConfig() { return new SeekableStreamSupervisorIOConfig( "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, 1, new Period("PT1H"), From 030bec165dea49eca204b5908430592b891262a8 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 15 Sep 2022 23:38:02 -0500 Subject: [PATCH 2/3] Fix serde and docs --- .../druid/data/input/impl/JsonInputFormat.java | 16 +++++++++++++--- .../druid/data/input/impl/JsonNodeReader.java | 2 +- docs/ingestion/data-formats.md | 2 +- website/.spelling | 2 ++ 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index 233d205fe4e3..b51c64914048 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -123,6 +123,18 @@ public boolean isKeepNullColumns() return keepNullColumns; } + @JsonProperty + public boolean isAssumeNewlineDelimited() + { + return assumeNewlineDelimited; + } + + @JsonProperty + public boolean isUseJsonNodeReader() + { + return useJsonNodeReader; + } + @Override public boolean isSplittable() { @@ -174,8 +186,7 @@ public boolean equals(Object o) && lineSplittable == that.lineSplittable && assumeNewlineDelimited == that.assumeNewlineDelimited && useJsonNodeReader == that.useJsonNodeReader - && Objects.equals(featureSpec, that.featureSpec) - && Objects.equals(objectMapper, that.objectMapper); + && Objects.equals(featureSpec, that.featureSpec); } @Override @@ -184,7 +195,6 @@ public int hashCode() return Objects.hash( super.hashCode(), featureSpec, - objectMapper, keepNullColumns, lineSplittable, assumeNewlineDelimited, diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java index 995172c52eee..7a7b98c4529a 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonNodeReader.java @@ -132,7 +132,7 @@ protected InputEntity source() } @Override - protected List parseInputRows(JsonNode intermediateRow) throws IOException, ParseException + protected List parseInputRows(JsonNode intermediateRow) throws ParseException { if (intermediateRow instanceof ParseExceptionMarkerJsonNode) { throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException(); diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md index 0b696fe2e1e4..e026d948f36d 100644 --- a/docs/ingestion/data-formats.md +++ b/docs/ingestion/data-formats.md @@ -96,7 +96,7 @@ The following properties are specialized properties that only apply when the JSO | Field | Type | Description | Required | |-------|------|-------------|----------| | assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) | -| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occured. | no (Default false) | +| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occurred. | no (Default false) | For example: ```json diff --git a/website/.spelling b/website/.spelling index 2f009625900c..9ef7392f032e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -216,6 +216,7 @@ aggregators ambari analytics arrayElement +assumeNewlineDelimited assumeRoleArn assumeRoleExternalId async @@ -484,6 +485,7 @@ unsetting untrusted useFilterCNF useJqSyntax +useJsonNodeReader useSSL uptime uris From b02eb42e75fe5b71c41430440054b5e7432eaacb Mon Sep 17 00:00:00 2001 From: jon-wei Date: Mon, 26 Sep 2022 12:20:11 -0500 Subject: [PATCH 3/3] Add PR comment check --- .../org/apache/druid/data/input/impl/JsonInputFormat.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java index b51c64914048..041942c9e626 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import javax.annotation.Nullable; @@ -108,6 +109,9 @@ public JsonInputFormat( this.lineSplittable = lineSplittable; this.assumeNewlineDelimited = assumeNewlineDelimited != null && assumeNewlineDelimited; this.useJsonNodeReader = useJsonNodeReader != null && useJsonNodeReader; + if (this.assumeNewlineDelimited && this.useJsonNodeReader) { + throw new IAE("useJsonNodeReader cannot be set to true when assumeNewlineDelimited is true."); + } } @JsonProperty