Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -55,21 +56,42 @@ public class JsonInputFormat extends NestedInputFormat
*/
private final boolean lineSplittable;

/**
* If true, overrides the behavior controlled by lineSplittable and the parsing will assume that the input
* is newline delimited. This is to allow streaming ingestion to use the newline delimited parser when the
* input is known to follow that format, since this allows for more flexible handling of parse errors (i.e.,
* an invalid JSON event on one line will not prevent other events on different lines from being ingested).
*/
private final boolean assumeNewlineDelimited;

/**
* If true, use the JsonNodeReader when parsing non-newline delimited JSON. This parser splits the input string
* into JsonNode objects, parsing each JsonNode into a separate InputRow, instead of parsing the input string
* into several InputRow at the same time. This allows for more flexible handling of parse errors, where timestamp
* parsing errors do not prevent other events in the same string from being ingested, and allows valid events prior to
* encountering invalid JSON syntax to be ingested as well.
*/
private final boolean useJsonNodeReader;

@JsonCreator
public JsonInputFormat(
@JsonProperty("flattenSpec") @Nullable JSONPathSpec flattenSpec,
@JsonProperty("featureSpec") @Nullable Map<String, Boolean> featureSpec,
@JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns
@JsonProperty("keepNullColumns") @Nullable Boolean keepNullColumns,
@JsonProperty("assumeNewlineDelimited") @Nullable Boolean assumeNewlineDelimited,
@JsonProperty("useJsonNodeReader") @Nullable Boolean useJsonNodeReader
)
{
this(flattenSpec, featureSpec, keepNullColumns, true);
this(flattenSpec, featureSpec, keepNullColumns, true, assumeNewlineDelimited, useJsonNodeReader);
}

public JsonInputFormat(
@Nullable JSONPathSpec flattenSpec,
Map<String, Boolean> featureSpec,
Boolean keepNullColumns,
boolean lineSplittable
boolean lineSplittable,
Boolean assumeNewlineDelimited,
Boolean useJsonNodeReader
)
{
super(flattenSpec);
Expand All @@ -85,6 +107,11 @@ public JsonInputFormat(
objectMapper.configure(feature, entry.getValue());
}
this.lineSplittable = lineSplittable;
this.assumeNewlineDelimited = assumeNewlineDelimited != null && assumeNewlineDelimited;
this.useJsonNodeReader = useJsonNodeReader != null && useJsonNodeReader;
Copy link
Copy Markdown
Contributor

@zachjsh zachjsh Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The description for this property above seems to indicate that it should be used for non-newline delimited JSON. Would assumeNewLineDelimted true interfere with this / should this be forced to false if assumeNewLineDelimted is true?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like maybe it would be an error to indicate useJsonNodeReader if assumeNewLineDelimted is set, though ignoring it seems ok too 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, that makes sense, I'll have it be an error

if (this.assumeNewlineDelimited && this.useJsonNodeReader) {
throw new IAE("useJsonNodeReader cannot be set to true when assumeNewlineDelimited is true.");
}
}

@JsonProperty
Expand All @@ -100,6 +127,18 @@ public boolean isKeepNullColumns()
return keepNullColumns;
}

@JsonProperty
public boolean isAssumeNewlineDelimited()
{
return assumeNewlineDelimited;
}

@JsonProperty
public boolean isUseJsonNodeReader()
{
return useJsonNodeReader;
}

@Override
public boolean isSplittable()
{
Expand All @@ -109,9 +148,13 @@ public boolean isSplittable()
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
return this.lineSplittable ?
new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns) :
new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
if (this.lineSplittable || this.assumeNewlineDelimited) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would lineSplitable be false, but the the data is actually newLineDelimited? Does this situation arise when you have a batch of new line delimited json events being ingested at one time, for example?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lineSplittable is used as an indicator from the task implementation as to whether it expects the input to be newline delimited (and thus use either JsonLineReader or JsonReader). Prior to this patch, batch tasks would always use newline delimited (JsonLineReader) and streaming tasks would always use the multiline parser (JsonReader).

With this patch, batch tasks would still always use newline delimited, while for streaming tasks the behavior is configurable (controlled by the new asssumeNewlineDelimited property)

return new JsonLineReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
} else if (this.useJsonNodeReader) {
return new JsonNodeReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
} else {
return new JsonReader(inputRowSchema, source, getFlattenSpec(), objectMapper, keepNullColumns);
}
}

/**
Expand All @@ -124,7 +167,10 @@ public JsonInputFormat withLineSplittable(boolean lineSplittable)
return new JsonInputFormat(this.getFlattenSpec(),
this.getFeatureSpec(),
this.keepNullColumns,
lineSplittable);
lineSplittable,
assumeNewlineDelimited,
useJsonNodeReader
);
}

@Override
Expand All @@ -140,12 +186,23 @@ public boolean equals(Object o)
return false;
}
JsonInputFormat that = (JsonInputFormat) o;
return this.lineSplittable == that.lineSplittable && Objects.equals(featureSpec, that.featureSpec) && Objects.equals(keepNullColumns, that.keepNullColumns);
return keepNullColumns == that.keepNullColumns
&& lineSplittable == that.lineSplittable
&& assumeNewlineDelimited == that.assumeNewlineDelimited
&& useJsonNodeReader == that.useJsonNodeReader
&& Objects.equals(featureSpec, that.featureSpec);
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), featureSpec, keepNullColumns, lineSplittable);
return Objects.hash(
super.hashCode(),
featureSpec,
keepNullColumns,
lineSplittable,
assumeNewlineDelimited,
useJsonNodeReader
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONFlattenerMaker;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* In contrast to {@link JsonLineReader} which processes input text line by line independently,
* this class tries to split the input into a list of JsonNode objects, and then parses each JsonNode independently
* into an InputRow.
*
* <p>
* The input text can be:
* 1. a JSON string of an object in a line or multiple lines(such as pretty-printed JSON text)
* 2. multiple JSON object strings concated by white space character(s)
* <p>
* If an input string contains invalid JSON syntax, any valid JSON objects found prior to encountering the invalid
Copy link
Copy Markdown
Contributor

@YongGang YongGang Sep 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder whether we can distinguish the failure is due to JSON syntax error or data field in wrong format (e.g. wrong Timestamp format).
If it's due to field in wrong format then we can continue to parse the rest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp in wrong format errors are special, in that Druid requires a valid __time field. For other columns, such parse errors would be detected at a later stage, and wouldn't run into the same issue of preventing other events from being ingested

The JSON syntax errors here are errors where malformed JSON syntax prevents the event from being deserialized at all into a map.

* syntax will be successfully parsed, but parsing will not continue after the invalid syntax.
* <p>
*/
public class JsonNodeReader extends IntermediateRowParsingReader<JsonNode>
{
private final ObjectFlattener<JsonNode> flattener;
private final ObjectMapper mapper;
private final InputEntity source;
private final InputRowSchema inputRowSchema;
private final JsonFactory jsonFactory;

JsonNodeReader(
InputRowSchema inputRowSchema,
InputEntity source,
JSONPathSpec flattenSpec,
ObjectMapper mapper,
boolean keepNullColumns
)
{
this.inputRowSchema = inputRowSchema;
this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new JSONFlattenerMaker(keepNullColumns));
this.mapper = mapper;
this.jsonFactory = new JsonFactory();
}

@Override
protected CloseableIterator<JsonNode> intermediateRowIterator() throws IOException
{
final String sourceString = IOUtils.toString(source.open(), StringUtils.UTF8_STRING);
final List<JsonNode> jsonNodes = new ArrayList<>();
try {
JsonParser parser = jsonFactory.createParser(sourceString);
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
while (delegate.hasNext()) {
jsonNodes.add(delegate.next());
}
}
catch (Exception e) {
//convert Jackson's JsonParseException into druid's exception for further processing
//JsonParseException will be thrown from MappingIterator#hasNext or MappingIterator#next when input json text is ill-formed
if (e.getCause() instanceof JsonParseException) {
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(sourceString, e, "Unable to parse row [%s]", sourceString)
)
);
} else {
throw e;
}
}

if (CollectionUtils.isNullOrEmpty(jsonNodes)) {
jsonNodes.add(
new ParseExceptionMarkerJsonNode(
new ParseException(
sourceString,
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
sourceString
)
)
);
}
return CloseableIterators.withEmptyBaggage(jsonNodes.iterator());
}

@Override
protected InputEntity source()
{
return source;
}

@Override
protected List<InputRow> parseInputRows(JsonNode intermediateRow) throws ParseException
{
if (intermediateRow instanceof ParseExceptionMarkerJsonNode) {
throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException();
}
final List<InputRow> inputRows = Collections.singletonList(
MapInputRowParser.parse(inputRowSchema, flattener.flatten(intermediateRow))
);

if (CollectionUtils.isNullOrEmpty(inputRows)) {
throw new ParseException(
intermediateRow.toString(),
"Unable to parse [%s] as the intermediateRow resulted in empty input row",
intermediateRow.toString()
);
}
return inputRows;
}

@Override
protected List<Map<String, Object>> toMap(JsonNode intermediateRow) throws IOException
{
if (intermediateRow instanceof ParseExceptionMarkerJsonNode) {
throw ((ParseExceptionMarkerJsonNode) intermediateRow).getParseException();
}
return Collections.singletonList(
mapper.readValue(intermediateRow.toString(), new TypeReference<Map<String, Object>>()
{
})
);
}

private static class ParseExceptionMarkerJsonNode extends ObjectNode
{
final ParseException parseException;

public ParseExceptionMarkerJsonNode(ParseException pe)
{
super(null);
this.parseException = pe;
}

public ParseException getParseException()
{
return parseException;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void testWithUrisFilter()
);

Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);

Expand All @@ -152,7 +152,7 @@ public void testWithUris()
);

Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);

Expand All @@ -173,7 +173,7 @@ public void testWithObjectsFilter()
);

Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);

Expand All @@ -194,7 +194,7 @@ public void testWithObjects()
);

Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ public void testSerde() throws IOException
)
),
ImmutableMap.of(Feature.ALLOW_COMMENTS.name(), true, Feature.ALLOW_UNQUOTED_FIELD_NAMES.name(), false),
true
true,
false,
false
);
final byte[] bytes = mapper.writeValueAsBytes(format);
final JsonInputFormat fromJson = (JsonInputFormat) mapper.readValue(bytes, InputFormat.class);
Expand All @@ -79,6 +81,8 @@ public void test_unsetUseFieldDiscovery_unsetKeepNullColumnsByDefault()
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(false, null),
null,
null,
null,
null
);
Assert.assertFalse(format.isKeepNullColumns());
Expand All @@ -90,6 +94,8 @@ public void testUseFieldDiscovery_setKeepNullColumnsByDefault()
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
null,
null,
null
);
Assert.assertTrue(format.isKeepNullColumns());
Expand All @@ -101,7 +107,9 @@ public void testUseFieldDiscovery_doNotChangeKeepNullColumnsUserSets()
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(true, null),
null,
false
false,
null,
null
);
Assert.assertFalse(format.isKeepNullColumns());
}
Expand Down
Loading