Skip to content

Fix streaming ingestion fails if it encounters empty rows (Regression)#10962

Merged
maytasm merged 2 commits intoapache:masterfrom
maytasm:IMPLY-6187
Mar 9, 2021
Merged

Fix streaming ingestion fails if it encounters empty rows (Regression)#10962
maytasm merged 2 commits intoapache:masterfrom
maytasm:IMPLY-6187

Conversation

@maytasm
Copy link
Copy Markdown
Contributor

@maytasm maytasm commented Mar 9, 2021

Fix streaming ingestion fails if it encounters empty rows (Regression)

Description

This is a regression in the JSON inputFormat when use with streaming ingestion, introduced by #10383
Streaming ingestion task will consistently fails when it try to parse an empty row despite not reaching maxParseExceptions yet. This is because when the task try to parse an empty row, it does not result in a ParseException but resulted in java.util.NoSuchElementException instead. Hence, the task will fail and will not be able to move pass the empty row even with maxParseExceptions set.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change lgtm 👍

@Override
protected List<InputRow> parseInputRows(String intermediateRow) throws IOException, ParseException
{
List<InputRow> inputRows;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: final

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +97 to +99
inputRows = FluentIterable.from(() -> delegate)
.transform(jsonNode -> MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
.toList();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this formatting seems off (intellij moves it in line with the .from when i pull your branch locally)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

//throw unknown exception
throw e;
}
if (CollectionUtils.isEmpty(inputRows)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI is failing asking for

  <groupId>commons-collections</groupId>
  <artifactId>commons-collections</artifactId>
  <version>3.2.2</version>
</dependency>

to be added to the pom of druid-core. Could instead use druid's org.apache.druid.utils.CollectionUtils.isNullOrEmpty which does the same check

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘

Copy link
Copy Markdown
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@FrankChen021
Copy link
Copy Markdown
Member

I'm curious that why not skip the empty rows instead of throwing a ParseException ?

@suneet-s
Copy link
Copy Markdown
Contributor

suneet-s commented Mar 9, 2021

Is it possible to add an integration test for this? Maybe update SyntheticStreamGenerator#run to send a null event a configurable number of times per run - then the tests can check how many parse exceptions were thrown.

I think a test like this can help flush out whether similar issues exist with other data formats... To be fair, I don't know why a null event would ever end up in a stream

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Mar 9, 2021

I'm curious that why not skip the empty rows instead of throwing a ParseException ?

This PR simply fix the regression caused by #10383 and not intended to change / redesign any behavior. The behavior for streaming ingestion before #10383 was that it throw ParseException for empty row. Also the current behavior for JsonLineReader (batch ingestion) is that it throw ParseException for empty row. After fixing the regression (which will at least allow streaming task to proceed through empty row like before the #10383 change), we can have a separate discussion on if we want to change the behavior to simply skip the empty rows instead.

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Mar 9, 2021

Is it possible to add an integration test for this? Maybe update SyntheticStreamGenerator#run to send a null event a configurable number of times per run - then the tests can check how many parse exceptions were thrown.

I think a test like this can help flush out whether similar issues exist with other data formats... To be fair, I don't know why a null event would ever end up in a stream

I think it is possible to add to the integration test but is not easy. For testing the JSON (this bug), i don't think the integration test would provide any additional coverage compare to KafkaIndexTaskTest.java and KinesisIndexTaskTest.java . For providing coverage for other other data formats, I don't think we have to do it now in this PR. This PR is only intended to fix the regression so I would like to keep the scope small. We can look into adding more integration tests for other data formats for various cases in a separate PR.

@suneet-s
Copy link
Copy Markdown
Contributor

suneet-s commented Mar 9, 2021

Is it possible to add an integration test for this? Maybe update SyntheticStreamGenerator#run to send a null event a configurable number of times per run - then the tests can check how many parse exceptions were thrown.
I think a test like this can help flush out whether similar issues exist with other data formats... To be fair, I don't know why a null event would ever end up in a stream

I think it is possible to add to the integration test but is not easy. For testing the JSON (this bug), i don't think the integration test would provide any additional coverage compare to KafkaIndexTaskTest.java and KinesisIndexTaskTest.java . For providing coverage for other other data formats, I don't think we have to do it now in this PR. This PR is only intended to fix the regression so I would like to keep the scope small. We can look into adding more integration tests for other data formats for various cases in a separate PR.

This seems like a reasonable approach. Can you create a github issue for this - maybe label this with Contributions Welcome or Starter so people from the community can help fill this gap

@maytasm
Copy link
Copy Markdown
Contributor Author

maytasm commented Mar 9, 2021

Is it possible to add an integration test for this? Maybe update SyntheticStreamGenerator#run to send a null event a configurable number of times per run - then the tests can check how many parse exceptions were thrown.
I think a test like this can help flush out whether similar issues exist with other data formats... To be fair, I don't know why a null event would ever end up in a stream

I think it is possible to add to the integration test but is not easy. For testing the JSON (this bug), i don't think the integration test would provide any additional coverage compare to KafkaIndexTaskTest.java and KinesisIndexTaskTest.java . For providing coverage for other other data formats, I don't think we have to do it now in this PR. This PR is only intended to fix the regression so I would like to keep the scope small. We can look into adding more integration tests for other data formats for various cases in a separate PR.

This seems like a reasonable approach. Can you create a github issue for this - maybe label this with Contributions Welcome or Starter so people from the community can help fill this gap

Done. #10971

@maytasm maytasm merged commit 4dd22a8 into apache:master Mar 9, 2021
@maytasm maytasm deleted the IMPLY-6187 branch March 9, 2021 20:12
@clintropolis clintropolis added this to the 0.22.0 milestone Aug 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants