Skip to content

KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL#9582

Merged
ableegoldman merged 9 commits intoapache:trunkfrom
ableegoldman:6687-reuse-source-topic
Nov 13, 2020
Merged

KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL#9582
ableegoldman merged 9 commits intoapache:trunkfrom
ableegoldman:6687-reuse-source-topic

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Nov 10, 2020

Needed to fix this on the side in order to more easily set up some experiments, so here's the PR.

Allows a user to create multiple KStreams and/or KTables from the same topic, collection of topics, or pattern. At the moment this isn't possible since we can only consume from a topic once, and each source topic maps to a single source node in the topology. The "fix" is just to rewrite the logical plan and merge any duplicate source nodes into a single node before it gets compiled into the physical topology.

The one exception is when the stream/table are subscribed to an overlapping-but-unequal collection of topics, which I left as future work (with a TODO in the comments describing a possible solution). If the offset reset policy doesn't match we just throw a TopologyException.

edit: tables are much more complicated so I opted to restrict things to just multiple KStreams for now on and consider allowing multiple KTables (or KStream+KTable) as followup work

@ableegoldman ableegoldman force-pushed the 6687-reuse-source-topic branch from 76fe912 to e9b0362 Compare November 10, 2020 05:28
@ableegoldman ableegoldman marked this pull request as draft November 10, 2020 18:06
@ableegoldman ableegoldman marked this pull request as ready for review November 11, 2020 00:09
@ableegoldman
Copy link
Copy Markdown
Member Author

Call for review @mjsax @cadonna @lct45

Comment on lines 413 to 424
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Saw this and at first I thought it was broken because it only considers pattern-subscribed topics that happened to explicitly configure an offset reset policy. Unless I'm missing something here, that makes no sense and we should consider all source patterns and whether they overlap.
But then I started thinking, why does it matter if they overlap? Just because one pattern is a substring of another does not mean that they'll match the same topics. So I think that we should actually just remove this restriction altogether. Am I missing anything here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the first part.
Regarding the second part, I had similiar thoughts when I wrote my comment in mergeDuplicateSourceNodes().
But I might also be missing something here.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might also be missing something, but what's the scenario where one pattern is a substring of another and they dont match the same topics? If you take Bruno's example from earlier of topic* and topi*, topi* would be considered a substring of topic* and they would both match topic A, right? I guess the other scenario is if we have a topic topia A, that would match topi* and not topic*. So I guess it seems like it isn't always true that they'll overlap, but we would want to check if they do, right?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pattern topic* is contained in pattern topic*A. However, topic*A matches only a subset of topic*. So, they do not match exactly the same topics. But matching exactly the same topics is a pre-requisite for merging the source nodes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in this case we were matching whether the pattern's string was a literal substring of another pattern's string, not whether the regexes themselves are substrings. So topi* would not be a substring of topic* because topi* is not contained literally within the string topic*. It's not doing a smart regex-matching, just a dumb literal string comparison

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the PR, @ableegoldman!

According to the ticket similar work needs to be done for table() and globalTable(). What do you think of adding subtasks to the ticket to track what has already been done and what not?

Here my feedback.

Comment thread streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java Outdated
Comment on lines 329 to 331
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid the instanceof and the casting if we introduce a RootGraphNode with a method sourceNodes(). Since a root can only have source nodes and state stores as children, we could make the topology code in general a bit more type safe. As far as I can see that would need some additional changes outside the scope of this PR. So, feel free to not consider this comment for this PR and we can do another PR for that.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's a fair point but I would prefer to keep the scope of this PR as small as possible for now. Maybe @lct45 could pick this up on the side once this is merged?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

Comment on lines 326 to 327
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear. This improves the situation but it is not a complete solution, right? Assume we have a topic topicA. Patterns topic* and topi* both match topicA but they are different when compared with this comparator. In that case a TopologyException would be thrown in the InternalTopologyBuilder, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes to all of that: this PR improves some situations, but not all. Specifically you would still get a TopologyException if (a) subscribing to overlapping but not equal collection of topics, (b) subscribing to a topic and to a pattern that matches said topic, and (c) subscribing to two (or more) patterns that match the same topic(s).

Case (c) is what you described, I just wanted to list them all here for completion. Here's my take on what we can/should reasonably try to tackle:

(a) this case is easily detected, easily worked around, and easy for us to fix. It results in a "compile time" exception (meaning when the topology is compiled, not the program) which users can quickly detect and work around if need be by rewriting the topology themselves. Fix is relatively straightforward but very low priority, so I plan to just file a followup ticket for this for now
(b) is easily detected (you get a compile time exception) and possible to work around, but difficult to solve. I think in all cases a user could find a way around this issue by some combination of topology rewriting and Pattern manipulation or topic renaming, depending on what exactly they're trying to achieve. Of course there's no way for us to detect what an arbitrary user is trying to do in this case, so I don't see any path forwarding to making this case possible. No plans to file a followup ticket
(c) is difficult to detect, might be possible to work around, and probably very complicated to actually fix. Unfortunately, in this case you only get a run-time exception, since there's no way of knowing which topics will or will not be created ahead of time. And I'm thinking that determining whether two regexes will both match any possible string may be unsolvable...so, no followup ticket planned for this.
WDYT?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the list of issues. I agree in all points.

Comment on lines 413 to 424
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree on the first part.
Regarding the second part, I had similiar thoughts when I wrote my comment in mergeDuplicateSourceNodes().
But I might also be missing something here.

Comment thread streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java Outdated
Comment thread streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java Outdated
Comment thread streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java Outdated
@ableegoldman ableegoldman changed the title KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic KAFKA-6687: rewrite topology to allow reading the same topic multiple times in the DSL Nov 13, 2020
@ableegoldman
Copy link
Copy Markdown
Member Author

According to the ticket similar work needs to be done for table() and globalTable(). What do you think of adding subtasks to the ticket to track what has already been done and what not?

Thanks for the review @cadonna . This fix actually does work for table() as well as stream(), I've updated the title and added unit tests to reflect this. As for the global table, there is already a separate ticket for this which should be linked to from KAFKA-6687.

I do think there's some possible followup work to further improve the situation (see my comment above) but I would say that's it's different enough to merit creating separate followup tickets rather than subtasks of this one. Lmk what you think

Comment on lines 906 to 910
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please extract this part in its own method since we use it also in a couple of other tests?

@cadonna
Copy link
Copy Markdown
Member

cadonna commented Nov 13, 2020

I do think there's some possible followup work to further improve the situation (see my comment above) but I would say that's it's different enough to merit creating separate followup tickets rather than subtasks of this one. Lmk what you think

Fair enough. Let's do it as you say.

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

Copy link
Copy Markdown

@lct45 lct45 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM pending Bruno's final comments

@ableegoldman ableegoldman force-pushed the 6687-reuse-source-topic branch from f8b95c5 to cf32d69 Compare November 13, 2020 19:28
Comment on lines +347 to +349
// TODO we only merge source nodes if the subscribed topic(s) are an exact match, so it's still not
// possible to subscribe to topicA in one KStream and topicA + topicB in another. We could achieve
// this by splitting these source nodes into one topic per node and routing to the subscribed children
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ableegoldman
Copy link
Copy Markdown
Member Author

Two unrelated flaky test failures:
EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota

@ableegoldman ableegoldman merged commit cb3dc67 into apache:trunk Nov 13, 2020
@ableegoldman
Copy link
Copy Markdown
Member Author

Merged to trunk

ableegoldman added a commit that referenced this pull request Dec 1, 2020
…opics (#9609)

Followup to PR #9582, need to restrict DSL so only KStreams can be created from the same set of topic(s)s but not KTables, which can be tackled as followup work

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bruno Cadonna <cadonna@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants