Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.java.util.common.parsers.ObjectFlattener;
import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -90,11 +91,12 @@ protected CloseableIterator<String> intermediateRowIterator() throws IOException
@Override
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
{
final List<InputRow> inputRows;
try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
final MappingIterator<JsonNode> delegate = mapper.readValues(parser, JsonNode.class);
return FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
inputRows = FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
}
catch (RuntimeException e) {
//convert Jackson's JsonParseException into druid's exception for further processing
Expand All @@ -106,6 +108,10 @@ protected List<InputRow> parseInputRows(String intermediateRow) throws IOExcepti
//throw unknown exception
throw e;
}
if (CollectionUtils.isNullOrEmpty(inputRows)) {
throw new ParseException("Unable to parse [%s] as the intermediateRow resulted in empty input row", intermediateRow);
}
return inputRows;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,62 @@ public void testSamplInvalidJSONText() throws IOException
}
}

@Test
public void testEmptyJSONText() throws IOException
{
final JsonInputFormat format = new JsonInputFormat(
new JSONPathSpec(
true,
ImmutableList.of(
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", "baz"),
new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", "baz2"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", "$.o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", "$.o.mg2"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", ".o.mg2")
)
),
null,
null,
false //make sure JsonReader is used
);

//input is empty
final ByteEntity source = new ByteEntity(
StringUtils.toUtf8(
"" // empty row
)
);

final InputEntityReader reader = format.createReader(
new InputRowSchema(
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", "foo"))),
Collections.emptyList()
),
source,
null
);

//expect a ParseException on the following `next` call on iterator
expectedException.expect(ParseException.class);

// the 2nd line is ill-formed, so the parse of this text chunk aborts
final int numExpectedIterations = 0;

try (CloseableIterator<InputRow> iterator = reader.read()) {
int numActualIterations = 0;
while (iterator.hasNext()) {
iterator.next();
++numActualIterations;
}

Assert.assertEquals(numExpectedIterations, numActualIterations);
}
}



@Test
public void testSampleEmptyText() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic
new ProducerRecord<byte[], byte[]>(topic, 0, null, jbb("2011", "e", "y", "10", "20.0", "1.0"), SAMPLE_HEADERS),
new ProducerRecord<>(topic, 0, null, jbb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")),
new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("")),
new ProducerRecord<>(topic, 0, null, null),
new ProducerRecord<>(topic, 0, null, jbb("2013", "f", "y", "10", "20.0", "1.0")),
new ProducerRecord<>(topic, 0, null, jbb("2049", "f", "y", "notanumber", "20.0", "1.0")),
Expand Down Expand Up @@ -1491,7 +1491,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=20.0, met1=notanumber}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [Unable to parse value[notanumber] for field[met1]]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]",
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]",
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
Expand Down Expand Up @@ -1560,7 +1560,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
Map<String, Object> unparseableEvents = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]"
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ private static List<OrderedPartitionableRecord<String, String, ByteEntity>> gene
stream,
"1",
"7",
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("unparseable2")))
Collections.singletonList(new ByteEntity(StringUtils.toUtf8("")))
),
new OrderedPartitionableRecord<>(stream, "1", "8", Collections.singletonList(new ByteEntity(StringUtils.toUtf8("{}")))),
new OrderedPartitionableRecord<>(stream, "1", "9", jbl("2013", "f", "y", "10", "20.0", "1.0")),
Expand Down Expand Up @@ -1347,7 +1347,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=10, dimFloat=notanumber, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to float]",
"Found unparseable columns in row: [MapBasedInputRow{timestamp=2049-01-01T00:00:00.000Z, event={timestamp=2049, dim1=f, dim2=y, dimLong=notanumber, dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}], exceptions: [could not convert value [notanumber] to long]",
"Timestamp[null] is unparseable! Event: {}",
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]",
"Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
)
Expand Down Expand Up @@ -1434,7 +1434,7 @@ public void testMultipleParseExceptionsFailure() throws Exception
Map<String, Object> unparseableEvents = ImmutableMap.of(
RowIngestionMeters.BUILD_SEGMENTS,
Arrays.asList(
"Unable to parse row [unparseable2]",
"Unable to parse [] as the intermediateRow resulted in empty input row",
"Unable to parse row [unparseable]"
)
);
Expand Down