KAFKA-5003: StreamThread should catch InvalidTopicException#2747
KAFKA-5003: StreamThread should catch InvalidTopicException#2747mjsax wants to merge 5 commits intoapache:trunkfrom
Conversation
|
Call for review @dguy @enothereska @bbejeck |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@mjsax is there any test failing that we can use to check if this works as expected? Thanks. |
There was a problem hiding this comment.
nit: maybe change method name to addToResetList since resetting does not occur here?
|
Should we update the |
|
@mjsax 2 minor comments, otherwise LGTM |
|
Updated: renamed method and added test for |
|
thanks @mjsax - LGTM |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
retest this please |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
dguy
left a comment
There was a problem hiding this comment.
one minor comment otherwise LGTM
There was a problem hiding this comment.
if (loggedTopics.add(topic)) {...}
|
Updated. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Retest this please. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
There was a problem hiding this comment.
I'm thinking this block should be extracted to a method. It doesn't really have much to do with pollRequests and it would be nicer to read if there was a method like resetPositions or similar
|
@mjsax not sure why the checks are failing. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
LGTM |
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR, left a couple of questions.
| consumer.seekToEnd(ex.partitions()); | ||
| addToResetList(partition, seekToEnd, "stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", "latest", loggedTopics); | ||
| } | ||
| log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); |
There was a problem hiding this comment.
Why did we remove this log line? And what is the behaviour if there's no reset policy defined?
There was a problem hiding this comment.
It is logged in addToResetList. In StreamsConfig we default the reset policy to earliest
There was a problem hiding this comment.
Having a second look, it seems that we throw an exception if the reset policy is not one of the expected values:
if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) {So the code can never reach this line.
There was a problem hiding this comment.
We can reach this line: there are two places the config can be set: globally (called originalReset here) and locally per topic. Thus, this exception here, only happens if both are invalid. We check the originalReset only, if no topic-local setting was specified.
There was a problem hiding this comment.
I am not sure you understood my comment. I am suggesting that the line where the log.info was removed can never be reached. Are you saying that it can be reached?
There was a problem hiding this comment.
Yes. The old line (555) was hit, when no topic-level offset-strategy was specified, but a global one. Note, in Streams, we "remember" the global config but don't provide it to the consumer -- we set consumer strategy to none always to allow for different topic-level reset strategy in the first place (ie, we apply the global one "manually" within Streams)
There was a problem hiding this comment.
Thanks all, I think I now understand the misunderstanding.
| final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); | ||
| final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d_1")); | ||
| final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]_1")); | ||
| final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y_1, TOPIC_Z_1); |
There was a problem hiding this comment.
Are these implicitly set to EARLIEST? If so, probably worth adding a comment. Same for the other test. Also, do we test the case where the original config is latest or not set?
There was a problem hiding this comment.
They are set to whatever is set in StreamsConfig, default is earliest.
There was a problem hiding this comment.
Don't think a comment is required. Streams people know that default is earliest.
There was a problem hiding this comment.
The point is making it clear what is being tested. Anyway, you didn't reply to the second question: "Also, do we test the case where the original config is latest or not set?"
There was a problem hiding this comment.
We don't have a test -- can add one.
|
@ijuma Thanks for talking a look. Did reply. |
|
@ijuma Added one more test. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); | ||
| pattern1Stream.print(); | ||
| pattern2Stream.print(); | ||
| namedTopicsStream.print(); |
There was a problem hiding this comment.
Thx. Missed to remove it...
|
LGTM, merged to trunk. |
We should catch
InvalidTopicExceptionand not justNoOffsetForPartitionException. Also, we need to step through all partitions that might be affected and reset those.