KAFKA-6687: restrict DSL to allow only Streams from the same source topics#9609
Conversation
cadonna
left a comment
There was a problem hiding this comment.
@ableegoldman Thanks for the PR!
I have some minor comments, in addition to @mjsax comments.
| } | ||
|
|
||
| public Set<String> topicNames() { | ||
| return new HashSet<>(topicNames); |
There was a problem hiding this comment.
Should we already demand a set of topics in the constructors of SourceGraphNode() and its children?
There was a problem hiding this comment.
I'm not sure I understand exactly what you're asking, but I made a few changes to this topic collection/method. Please lmk if it hasn't addressed your question
There was a problem hiding this comment.
I like your changes. What I meant is that we could change the constructor of SourceGraphNode to:
public SourceGraphNode(final String nodeName,
final Set<String> topicNames,
final ConsumedInternal<K, V> consumedInternal)
and the one of StreamSourceNode to:
public StreamSourceNode(final String nodeName,
final Set<String> topicNames,
final ConsumedInternal<K, V> consumedInternal)
In this way, we have a set of topics as soon as possible in the code path from the public API. I think this makes it clearer that it is not possible to have duplicates of topics internally.
To keep this PR small, I would propose to just do the changes for SourceGraphNode, and do the other changes in a separate PR.
There was a problem hiding this comment.
Ah ok you meant making it a Set vs a Collection -- I do agree with the principle, and I did push the Set-ification of the topics up one level so that the actual class field is a Set. But I don't think it's really worth it to push it up another layer and Set-ify the constructor argument. For one thing we would just have to do the same conversion to a Set but in more places, and more importantly, the actual callers of the constructor don't care at all whether it's a Set or any other Collection. So I think it actually does make sense to convert to a Set inside the constructor body
There was a problem hiding this comment.
Fair enough and I think that this is nothing urgent or absolute necessary. However, I would like to explain my line of thoughts. I think an interface of a class should also describe the constraints on the the object and as far as I see it does not make any sense to pass the same topic name multiple times to a source node.
I do not see an issue with making the same conversion in more places and actually this is even not true because the only place we would do a conversion is in StreamsBuilder#stream(). All other dependent calls create a singleton collection which can be easily replaced with a singleton set. Actually, I do not understand why StreamsBuilder#stream() takes a collection instead of a set.
I am not sure I can follow your other argument
the actual callers of the constructor don't care at all whether it's a Set or any other Collection
Do you refer to the creation of the singleton collection in the callers?
As I said, I do not say we need to follow my proposal. I just wanted to argue in favor of a cleaner and more descriptive interface.
There was a problem hiding this comment.
All other dependent calls create a singleton collection which can be easily replaced with a singleton set
Ah ok, I didn't notice that. I guess I only looked at StreamsBuilder#stream
Actually, I do not understand why StreamsBuilder#stream() takes a collection instead of a set.
This I totally agree with. I suspect the intention was just for convenience, so users don't have to do a list->set conversion themselves, but I personally don't find that to be a very strong argument. It doesn't seem worth doing a KIP over, but maybe if we rewrite some large parts of the DSL in the future, we can fix this as well
By "callers" I meant the method body of StreamsBuilder#stream, which doesn't really care whether there are duplicates in the collection because its only job is to pass the topics straight from the user to this source node.
But I see your point. If I touch on some related code in a future PR I can fix this on the side, or I'd be happy to review a PR if you want to submit one. Thanks for the discussion
| import org.apache.kafka.common.serialization.Serde; | ||
| import org.apache.kafka.streams.kstream.internals.ConsumedInternal; | ||
|
|
||
| abstract public class SourceGraphNode<K, V> extends StreamsGraphNode { |
There was a problem hiding this comment.
Can we also rename StreamsGraphNode to GraphNode? The Streams prefix is a bit confusing, IMO, because StreamSourceNode and StreamsGraphNode seem really similar although they are quite different.
There was a problem hiding this comment.
Ok, but don't come crying when the PR blows up in length 😉 (but yeah that makes sense to me)
There was a problem hiding this comment.
I have never said you need to do it in this PR 😉 . Jokes apart, I think in general it would be better to do such things in a separate PR, but when I wrote my comment, I completely forgot about it. Sorry about that!
|
@mjsax @cadonna I addressed your feedback, lmk if there's anything else (to answer your question, Matthias, this fixes the problem because we use |
|
Also just FYI since the suggested renaming touched on a lot of files, the only logical changes are to the SourceGraphNode, StreamSourceNode, and StreamsBuilderTest classes |
| name, | ||
| aggregateBuilder, | ||
| streamsGraphNode, | ||
| graphNode, |
There was a problem hiding this comment.
wth happened here 🤔
|
I don't want to spoil the party, but as this is a bug-fix PR, that we want to back-port, we should not to massive renaming... Happy, do rename in follow up for |
mjsax
left a comment
There was a problem hiding this comment.
Overall LGTM. I think it's ok to have the renaming in trunk (and maybe 2.7), but I have doubts to cherry-pick ii to 2.6...
From a bug-fix/cherry-picking perspective, it might be best to undo the renaming, and cherry-pick the fix to 2.7 and 2.6 such that we can track the fix via git easily. And do a follow up PR for trunk only.
Feel free to merge.
|
@mjsax it's not a bugfix PR, this is only going to trunk. Well technically it is fixing a bug, but that bug was only merged to trunk a few weeks ago. Otherwise I would agree, that would be merge hell 🔥 |
|
Merged to trunk |
…t-for-generated-requests * apache-github/trunk: (405 commits) KAFKA-6687: restrict DSL to allow only Streams from the same source topics (apache#9609) MINOR: Small cleanups in `AlterIsr` handling logic (apache#9663) MINOR: Increase unit test coverage of method ProcessorTopology#updateSourceTopics() (apache#9654) MINOR: fix reading SSH output in Streams system tests (apache#9665) KAFKA-10770: Remove duplicate defination of Metrics#getTags (apache#9659) KAFKA-10722: Described the types of the used state stores (apache#9607) KAFKA-10702; Skip bookkeeping of empty transactions (apache#9632) MINOR: Remove erroneous extra <code> in design doc (apache#9657) KAFKA-10736 Convert transaction coordinator metadata schemas to use g… (apache#9611) MINOR: Update vagrant/tests readme (apache#9650) KAFKA-10720: Document prohibition on header mutation by SMTs (apache#9597) KAFKA-10713: Stricter protocol parsing in hostnames (apache#9593) KAFKA-10565: Only print console producer prompt with a tty (apache#9644) MINOR: fix listeners doc to close <code> properly (apache#9655) MINOR: Remove unnecessary statement from WorkerConnector#doRun (apache#9653) KAFKA-10758: ProcessorTopology should only consider its own nodes when updating regex source topics (apache#9648) KAFKA-10754: fix flaky tests by waiting kafka streams be in running state before assert (apache#9629) MINOR: Upgrade to Scala 2.13.4 (apache#9643) MINOR: Update build and test dependencies (apache#9645) MINOR: Remove redundant argument from TaskMetricsGroup#recordCommit (apache#9642) ... clients/src/main/java/org/apache/kafka/clients/ClientRequest.java clients/src/main/java/org/apache/kafka/clients/NetworkClient.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java clients/src/test/java/org/apache/kafka/common/requests/ProduceResponseTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
Followup to #9582
Will leave the ability to create multiple KTables from the same source topic as followup work. Similarly, creating a KStream and a KTable from the same topic can be tackled later if need be