Fix ingestion failure of pretty-formatted JSON message#10383
Fix ingestion failure of pretty-formatted JSON message#10383jihoonson merged 26 commits intoapache:masterfrom
Conversation
|
CI reports some failures of integration tests, I'll check it later |
|
Once a json text line is ill-formed, the I don't know whether jackson provides anyway to skip the ill-formed text, but I guess there's no such API for a parser. |
|
@FrankChen021 I think we should add integration tests for this. Consider updating the data files in The CI failures look legit. I re-triggered the failing integration test since it looked like it might have been flaky. |
Hmm, this is definitely a problem. When we're reading JSON from a file, we should be skipping the "lines" that aren't parseable, and marking them as unparseable. I guess that ObjectMapper API was too good to be true 🙂 Maybe we can do one of the following two things. Option 1: After an error we can manually skip to the line that starts with Option 2: Introduce a IMO Option 2 sounds nicer, since it doesn't involve doing weird stuff with the ObjectReader APIs to try to skip unparseable data. |
|
@FrankChen021 good catch. The option 2 among @gianm's suggestions sounds better to me. The option 1 sounds hard to actually implement since there will be lots of edge cases we should handle. Regarding integration tests, I think it would depend on what this PR is going to change. If it needs to modify some behavior in interaction between supervisor and tasks (for example, if the supervisor should set the |
|
@gianm @jihoonson Thanks for the suggestions. Option 2 is much easier for us to control. I would like to do it as the following. As parameter |
|
@FrankChen021 it sounds good to me 🙂 |
|
Marked as WIP temporarily.
It would take some time to handle this problem. |
dc5d59c to
1c50b01
Compare
|
@FrankChen021 hmm, do you think it would be better to change the behavior of the sampler? Maybe its behavior should match with the actual data reader. |
|
@jihoonson I intended to do that, but after some deep investigation of original implementation, I found that it was not the root cause of previous problem. As a result, exception handling in the new The PR is now ready for review. I would like hear your voices with regarding to the pass of value of |
jihoonson
left a comment
There was a problem hiding this comment.
@FrankChen021 thanks for the update. I think there is an assumption in the sampler design which should be modified to support the new requirement for parsing multiple JSON objects in an input chunk. I left more detailed comments in line.
|
@jihoonson The patch has been submitted. The change of signature of |
jihoonson
left a comment
There was a problem hiding this comment.
@jihoonson The patch has been submitted. The change of signature of
IntermediateRowParsingReader#toMapinvolves changes on all existing subclasses, which are reported insufficient branch or line coverage by CI.
@FrankChen021 thanks! I left some more comments. For the CI failure, I think we can ignore the test coverage bot for the interface change. However, it seems like there are missing branches in IntermediateRowParsingReader and the new JsonReader as well. Can you please check it?
Diff coverage statistics:
------------------------------------------------------------------------------
| lines | branches | functions | path
------------------------------------------------------------------------------
| 100% (5/5) | 100% (2/2) | 95% (19/20) | org/apache/druid/data/input/impl/JsonInputFormat.java
| 77% (17/22) | 25% (1/4) | 94% (49/52) | org/apache/druid/data/input/impl/JsonReader.java
| 81% (9/11) | 100% (0/0) | 90% (58/64) | org/apache/druid/data/input/impl/JsonLineReader.java
| 0% (0/1) | 100% (0/0) | 50% (1/2) | org/apache/druid/data/input/impl/DelimitedValueReader.java
| 0% (0/1) | 100% (0/0) | 100% (2/2) | org/apache/druid/data/input/impl/RegexReader.java
| 72% (8/11) | 33% (2/6) | 62% (5/8) | org/apache/druid/data/input/IntermediateRowParsingReader.java
| 0% (0/1) | 100% (0/0) | 50% (1/2) | org/apache/druid/data/input/avro/AvroOCFReader.java
| 0% (0/1) | 100% (0/0) | 0% (0/1) | org/apache/druid/data/input/orc/OrcReader.java
| 100% (1/1) | 100% (0/0) | 100% (1/1) | org/apache/druid/data/input/parquet/ParquetReader.java
------------------------------------------------------------------------------
Total diff coverage:
- lines: 74% (40/54)
- branches: 41% (5/12)
- functions: 89% (136/152)
ERROR: Insufficient branch coverage of 41% (5/12). Required 50%.
jihoonson
left a comment
There was a problem hiding this comment.
Hi @FrankChen021, thank you for your patience. I think we are almost there. Please check my last comments.
BTW, now there is only one CI failure:
Diff coverage statistics:
------------------------------------------------------------------------------
| lines | branches | functions | path
------------------------------------------------------------------------------
| 0% (0/1) | 100% (0/0) | 100% (1/1) | org/apache/druid/metadata/input/SqlReader.java
------------------------------------------------------------------------------
Total diff coverage:
- lines: 0% (0/1)
- branches: 100% (0/0)
- functions: 100% (1/1)
ERROR: Insufficient line coverage of 0% (0/1). Required 50%.
This is because of the interface change you made for sampler, which means the unit test is missing for sampler for SqlReader. I know the missing test is not your fault, but would you mind adding one? It would be pretty straightforward. You can add one in SqlInputSourceTest by mostly replicating testSingleSplit(). All you need to modify are calling sqlReader.sample() instead of read() and verifying InputRowListPlusRawValuess insead of InputRows. But I totally understand even if you don't want to do it in this PR.
There was a problem hiding this comment.
This doesn't still seem correct to me.. The purpose of returning rawColumn and exception in InputRowListPlusRawValues is showing them in the sampler altogether, so that users will debug their data and input format easily. To do that, we should show exactly what the data was when a ParseException is thrown. Now, thinking about the new behavior parsing a list of JSONs into InputRows, row will contain the list of JSONs here. If a ParseException was thrown while parsing one of them, we should return one InputRowListPlusRawValues which contains row (the whole list of JSONs) and the exception because we don't know exactly in which JSON in the list the ParseException was thrown from. For this, I still think we should change rawValues in InputRowListPlusRawValues to be List<Map<String, Object>>. I understand that you don't want to touch it as it is widely used in unit tests, but I'm not sure if we can fix this without touching it.
There was a problem hiding this comment.
I agree with you.
rawValues in InputRowListPlusRawValues should also be a list to correspond the list of inputRows. Since the rawValues is also passed to SamplerResponseRow when exception occurs, to change rawValues from Map to List<Map> also involves changes of the web console. I don't know if the change of front-end should also be included in this PR.
There was a problem hiding this comment.
Good point. I'm not sure if we have a unit test for parsing sampler response. If we have, the CI will fail and we need to fix it in this PR together. Otherwise, I'm OK with doing the fix in a follow-up. @vogievetsky do you know whether or not we have such a unit test?
There was a problem hiding this comment.
I think we should set both rows and rawColumnsList in InputRowListPlusRawValues in this case, so that users will learn how they are different.
|
The branch has been rebased on master because there're some conflicts with it. @jihoonson Here're some explanation on the latest changes 1st, a new method 2nd 3rd, there's no changes made on existing |
1b43469 to
a9eb12a
Compare
jihoonson
left a comment
There was a problem hiding this comment.
@FrankChen021 thanks, sounds good! I reviewed the latest change, but have a concern with potential duplicate rows in the sampler result.
| @Nullable | ||
| public Map<String, Object> getRawValues() | ||
| { | ||
| return CollectionUtils.isNullOrEmpty(rawValues) ? null : rawValues.get(0); |
There was a problem hiding this comment.
nit: it would be better to use Iterables.getOnlyElement(rawValues) to make sure that you are getting the only element.
There was a problem hiding this comment.
But Iterables.getOnlyElement does not check whether the given input is null or not. If the given input is null, there will be a NPE.
There was a problem hiding this comment.
I meant, for rawValues.get(0) to make sure there is only one element in there if this method is called.
| */ | ||
| public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, Object>> rawColumnsList, | ||
| @Nullable List<InputRow> inputRows, | ||
| ParseException parseException) |
There was a problem hiding this comment.
ParseException is Nullable.
| responseRows.addAll(rawColumnsList.stream() | ||
| .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, e.getMessage())) | ||
| .collect(Collectors.toList())); |
There was a problem hiding this comment.
Can this introduce duplicate rows in responseRows? Suppose you have 3 rawColumns in rawColumnsList including an unparseable row at the second position. The first row will be added to both index and thus a new SamplerResponseRow of the first rawColumns will be added to responseRows. But for the second row, index.add() will throw a ParseException which will execute these lines. In this case, 2 duplicate rawColumns of the first row will be added in responseRows.
Looking at what JsonReader does, it seems throwing away the whole intermediateRow when there is any unparseable row. The sampler behavior should match to the actual ingestion.
There was a problem hiding this comment.
I notice that index.add does not throw ParseException but returns the exception in its returning result. The previous version of InputSourceSampler checks the result to rethrow this exception, while the change introduced by #10336 deletes these exception related code, which means the code here won't throw ParseException.
Of course, there's possibility that the code in for-loop throws ParseException in future. I've made little changes to avoid this potential situation to happen. Let's check it once the CI passes.
BTW, my local branch has failed to build since last time it was rebased on master, I have to reply on the CI to check if there're any test case failures.
There was a problem hiding this comment.
I notice that
index.adddoes not throwParseExceptionbut returns the exception in its returning result. The previous version ofInputSourceSamplerchecks the result to rethrow this exception, while the change introduced by #10336 deletes these exception related code, which means the code here won't throw ParseException.Of course, there's possibility that the code in for-loop throws
ParseExceptionin future. I've made little changes to avoid this potential situation to happen. Let's check it once the CI passes.
Oops, good point. I forgot what I have done in #10336.. I think index.add() should never throw parseException directly, but return them in IncrementalIndexAddResult. We can add this contract on the Javadoc of IncrementalIndex.add(). I suggest removing the catch clause here to avoid future confusion (there is no point in catching exceptions which cannot be thrown here).
There was a problem hiding this comment.
Got it. BTW, should we handle the exception returned in IncrementalIndexAddResult by adding the corresponding row to responseRows ?
There was a problem hiding this comment.
Yes, I think so because the sampler result should match to what will actually be ingested to Druid.
There was a problem hiding this comment.
A new case testIndexParseException in InputSouceSamplerTest has been added to check whether the exception returned by index.add has been processed properly. And also some little changes are applied in IncrementalIndex.getCombinedParseException and SampleInputRow to make exception message more accurate.
|
@FrankChen021 thanks. I left my last comments. Everything else looks good to me. |
|
This pull request introduces 1 alert when merging c1157d2 into 9c51047 - view on LGTM.com new alerts:
|
jihoonson
left a comment
There was a problem hiding this comment.
@FrankChen021, thanks for addressing comments! I have 2 major comments on the test side though. Please take a look.
| if (rawColumnsList != null) { | ||
| // add all rows to response | ||
| responseRows.addAll(rawColumnsList.stream() | ||
| .map(rawColumns -> new SamplerResponseRow(rawColumns, null, true, parseException.getMessage())) | ||
| .collect(Collectors.toList())); |
There was a problem hiding this comment.
nit: with this change, when a parseException is thrown while parsing a list of rows in one message, those rows in the message will be added as separate rows in responseRows but with the same parseException. This can be confusing since the parseException error message seem irrelevant to the associated rawColumns in the same responseRow. IMO, the better fix in this case would be storing the whole rawColumnsList in one SamplerResponseRow. Then, the parseException can indicate that it was thrown while parsing one of the rows in rawColumnsList. However, this requires a change on the web console side as well. I'm OK with fixing this in a follow-up PR.
There was a problem hiding this comment.
Yes, returning the whole rawColumnsList involves some changes at the web-console side which is currently out of my ability. Thanks for understanding.
| //well-formed | ||
| new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)), | ||
| //ill-formed | ||
| new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("illformed")), |
There was a problem hiding this comment.
Did you intend StringUtils.toUtf8(illformed)? The variable illformed is not in use.
There was a problem hiding this comment.
Yes, illformed here should be a variable instead of a text string.
| } | ||
|
|
||
| @Test | ||
| public void testIndexParseException() throws IOException |
There was a problem hiding this comment.
Thanks for adding this test! Similar to https://github.com/apache/druid/pull/10383/files#diff-ef25ac1cc1f275b47b939b65e1d0c8b8b8512aeada52d06b8541b8f381df03eeR2731, could you please add a unit test for sampling a block of multiple JSON strings? The unit test can be run only when parserType is STR_JSON. We usually just return in the unit test when the parameter is not what we want to test with. It would be nice if the test verifies the followings:
- The sampler response when there is no parseException with a list of multiple JSON strings.
- The sampler response when there is a parseException thrown while parsing a list of multiple JSON strings. Maybe you can improve this unit test to do it as well.
There was a problem hiding this comment.
A block of multiple json strings is only supported in stream ingestion as we discussed above, that's also why the test cases are added in kafka-indexing-service module.It's implemented by setting the new property on JsonInputFormat in RecordSupplierInputSource.
For test cases for InputSourceSampler here, the input source is InlineInputSource which always uses InputEntityIteratingReader even when input format is JsonInputFormat, there's no chance to test that. So I think we don't need add test cases here.
There was a problem hiding this comment.
I think we need such tests because KafkaIndexTaskTest only covers the actual indexing side, but not the sampling side. Without those tests, we could break the sampler without being noticed.
For test cases for
InputSourceSamplerhere, the input source isInlineInputSourcewhich always usesInputEntityIteratingReadereven when input format isJsonInputFormat, there's no chance to test that. So I think we don't need add test cases here.
Maybe this is not a good place for the new tests, but you don't have to use the InlineInputSource for them. Rather, you should use RecordSupplierInputSource as you mentioned. Since RecordSupplierInputSource accepts the RecordSupplier interface as a parameter which is the data supplier, I think you could implement a mock for testing (I don't think we should test Firehose as it's deprecated). Or do you see some reason you cannot? If it's hard, I'm also OK with adding them in a follow-up.
There was a problem hiding this comment.
I think we need such tests because
KafkaIndexTaskTestonly covers the actual indexing side, but not the sampling side. Without those tests, we could break the sampler without being noticed.
I forgot that test cases in KafkaIndexTaskTest only cover the indexing process. It's OK for me to add such tests.
BTW, integration tests failed, I don't think it's related to changes in this PR, and the failure can also be seen in my other PR, is there any other problem ?
There was a problem hiding this comment.
Thanks for understanding. Do you want to add those tests in this PR? For the integration test failure, I don’t think it’s relevant to the changes in this PR. I just retriggered it.
There was a problem hiding this comment.
I've added a test testMultipleJsonStringInOneBlock, please check it.
jihoonson
left a comment
There was a problem hiding this comment.
@FrankChen021 thanks for adding a test! All changes good to me, but please fix the CI.
| String.join("", STR_JSON_ROWS), | ||
|
|
||
| // exclude the last line to form a legal json block | ||
| String.join("", STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 1).collect(Collectors.toList())) |
There was a problem hiding this comment.
[ERROR] indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java:1221 -- Can be replaced with 'Collectors.joining'
Seems like the Intellij Inspection CI doesn't like this line.
There was a problem hiding this comment.
Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check.
I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there a simple way to run the checks mentioned above ?
There was a problem hiding this comment.
could you re-trigger the CI ? The failure does not related to changes in the PR.
Failed with bad exit code during 'Extraction'
[ERROR] Failed to execute goal on project druid-sql: Could not resolve dependencies for project org.apache.druid:druid-sql:jar:0.21.0-SNAPSHOT: Failed to collect dependencies at org.apache.calcite:calcite-core:jar:1.21.0 -> org.codehaus.janino:janino:jar:3.0.11: Failed to read artifact descriptor for org.codehaus.janino:janino:jar:3.0.11: Could not transfer artifact org.codehaus.janino:janino-parent:pom:3.0.11 from/to central (https://repo.maven.apache.org/maven2): Failed to transfer file https://repo.maven.apache.org/maven2/org/codehaus/janino/janino-parent/3.0.11/janino-parent-3.0.11.pom with status code 503 -> [Help 1]
There was a problem hiding this comment.
Every time I push a commit to a branch that is being merge, I run the test cases in the module which contains the changes in that commit. If it's OK, I'll push the commit. But I find that there's a high probability that CI fails. Sometime it's related to inspection check, sometimes it's caused by failures of test cases in other modules, sometime it's about dependency check, sometime it has something with license check.
I wonder what steps do you follow to check before push a commit ? Do you run all the cases in all modules ? Or is there a simple way to run the checks mentioned above ?
@FrankChen021 That's what I usually do as well. To be honest, unexpected CI failures make me annoyed too 😅 You can run those checks on your own by running the same command as what Travis runs. You may want to set up some pre-commit/post-commit hooks. The best would be some automatic correction for trivial issues, but I'm not sure if there is such a tool available which is matured and reliable enough.
|
I'm merging this PR. Thanks @FrankChen021! |
I appreciate your review and comments very much! |
* scaffolding * readme * adjust * more better, janky heap metadata store, primitive job queue that can submit to overlord, it works - sort of * test scaffolding * move InputFormat into IngestSchema * imply-5135 Create & list ingest tables * Addressed PR comments * Removed bean IngestTable * job processing + sql metadata job table (#78) * Add indexed-table-loader (#65) * Add indexed-table-loader * Fix checkstyle * Fix intelliJ inspections * Fix analyze dependencies * fix license check job * Add imply-druid-security (#66) * Add imply-druid-security * fix checkstyle * Fix analyze dependencies * Fix license check job * Update license header for all imply extensions * fix intelliJ inspections * code review * modify access to protected SQLMetadataConnector methods to allow extensions to create SQL metadata tables using implementation specific constructs (payload type, serial type, etc) (apache#10573) * Correct getRandomBalancerSegmentHolderTest (apache#10569) * Add missing docs for timeout exceptions (apache#10554) * Add missing docs for timeout exceptions * Add info on auth failures * Fix ingestion failure of pretty-formatted JSON message (apache#10383) * support multi-line text * add test cases * split json text into lines case by case * improve exception handle * fix CI * use IntermediateRowParsingReader as base of JsonReader * update doc * ignore the non-immutable field in test case * add more test cases * mark `lineSplittable` as final * fix testcases * fix doc * add a test case for SqlReader * return all raw columns when exception occurs * fix CI * fix test cases * resolve review comments * handle ParseException returned by index.add * apply Iterables.getOnlyElement * fix CI * fix test cases * improve code in more graceful way * fix test cases * fix test cases * add a test case to check multiple json string in one text block * fix inspection check * Add TravisCI job that builds and tests on ARM64 CPU architecture (apache#10562) * Ensure Krb auth before killing YARN apps in graceful shutdown (apache#9785) * job processing + sql metadata * Web console: fix data loader schema table column ordering bug and other polish (apache#10588) * remove unused fields * keep tables live * advanced * fix schema view * better indication * tests pass * Show more instead of show advanced * fix tests * extract dynamic configs * update snapshots * fix issues * update snapshot * reword without > * some javadoc * modify druid.historical.cache.maxEntrySize property in Unified format (apache#10590) Co-authored-by: yuezhang <yuezhang@freewheel.tv> * Fix license header for imply extensions (#76) * Fix license header for imply extensions * arm64 packaging should use jdk8 * maybe this time * jobs and states and status and whatever * use indexing client and coordinator client instead of leader client * always running * simplify * fix readme * Add zero period support to TIMESTAMPADD (apache#10550) * Allow zero period for TIMESTAMPADD * update test cases * add empty zone test case * add unit test cases for TimestampShiftMacro * add -Pimply-saas distribution profile, table exists check * update readme Co-authored-by: Suneet Saldanha <suneet.saldanha@imply.io> Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> Co-authored-by: github-actions <github-actions@github.com> Co-authored-by: Atul Mohan <atulmohan.mec@gmail.com> Co-authored-by: frank chen <frank.chen021@outlook.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Suneet Saldanha <suneet@apache.org> Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com> Co-authored-by: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Co-authored-by: yuezhang <yuezhang@freewheel.tv> * fix style and headers * fix fails * fix auth Co-authored-by: Agustin Gonzalez <agustin.gonzalez@imply.io> Co-authored-by: Suneet Saldanha <suneet.saldanha@imply.io> Co-authored-by: Lucas Capistrant <capistrant@users.noreply.github.com> Co-authored-by: github-actions <github-actions@github.com> Co-authored-by: Atul Mohan <atulmohan.mec@gmail.com> Co-authored-by: frank chen <frank.chen021@outlook.com> Co-authored-by: Martin Grigorov <martin-g@users.noreply.github.com> Co-authored-by: Suneet Saldanha <suneet@apache.org> Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com> Co-authored-by: zhangyue19921010 <69956021+zhangyue19921010@users.noreply.github.com> Co-authored-by: yuezhang <yuezhang@freewheel.tv>
* support multi-line text * add test cases * split json text into lines case by case * improve exception handle * fix CI * use IntermediateRowParsingReader as base of JsonReader * update doc * ignore the non-immutable field in test case * add more test cases * mark `lineSplittable` as final * fix testcases * fix doc * add a test case for SqlReader * return all raw columns when exception occurs * fix CI * fix test cases * resolve review comments * handle ParseException returned by index.add * apply Iterables.getOnlyElement * fix CI * fix test cases * improve code in more graceful way * fix test cases * fix test cases * add a test case to check multiple json string in one text block * fix inspection check
This PR fixes #10259 .
Description
The cause and solution has been discussed completely in the issue.
In this PR,
JsonReaderis provided to read json text for streaming ingestionJsonReaderis renamed toJsonLineReaderfor batch ingestionThis PR has: