Skip to content

KAFKA-6958: Allow to name operation using parameter classes#6410

Merged
bbejeck merged 1 commit intoapache:trunkfrom
fhussonnois:KAFKA-6958-SUB-TASK2
Apr 18, 2019
Merged

KAFKA-6958: Allow to name operation using parameter classes#6410
bbejeck merged 1 commit intoapache:trunkfrom
fhussonnois:KAFKA-6958-SUB-TASK2

Conversation

@fhussonnois
Copy link
Copy Markdown
Contributor

Hi @mjsax @bbejeck

This is the 2nd PR for the KIP-307.

NOTE : PR 6409 should be merge first

Thanks a lot for the review.

@fhussonnois fhussonnois changed the title KAFKA-6958-SUB-TASK2 KAFKA-6958: Allow to name operation using parameter classes Mar 8, 2019
@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch from 55fd810 to fda8784 Compare March 8, 2019 19:56
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 12, 2019

Java 8 and Java 11 failed test results already cleaned up

retest this please

@bbejeck bbejeck self-assigned this Mar 12, 2019
@bbejeck bbejeck requested review from guozhangwang and mjsax March 12, 2019 18:06
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 12, 2019

\cc @vvcephei @ableegoldman for reviews

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fhussonnois, I've made a pass and have a few minor comments.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java Outdated
Copy link
Copy Markdown
Member

@bbejeck bbejeck Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I'm in favor of code re-use, in this case, the code in Topic.validate is not too large and could be easily ported to a Named.validate method. By doing so, Kafka Streams can change naming rules as needed.

I realize that Materialized.as usesTopic.validate to validate the name of the store, but I'd suggest updating to use Named.validate there as well.

NOTE: If we do this, we'd need to update the KIP
EDIT: Actually I'm not sure we'd need to update the KIP as most likely this method would not be publicly accessible.

Copy link
Copy Markdown
Member

@bbejeck bbejeck Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fhussonnois thinking about this some more, what is the motivation for doing a validation here for processor names?

When Streams starts up the AdminClient will attempt to create any internal topics and the full topic names are validated at that point, so we don't need this check up front.

\cc @guozhangwang

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck, actually I've followed the same logic than for the class Materialized. In general, we should try to throw the exception as soon as possible. Also, here the exception is thrown during the build of the topology (a bad name will be detected during unit tests). If we do not check the name in the Named/Materialized classes then an nivalid name will be only detected during runtime. Depending of the semantic we would like to have, we can consider than an invalid name is topology exception or an invalid topic exception ?

Also, I'm also in favor to duplicate Topic.validate as this will allow to not tie the operation name with the topic name in the exception message.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have a point about checking via unit tests, but when the AdminClient attempts to create the topic, the check is against the full name.

The check here only looks at the user-supplied name, which is only part of the topic name so that we could have a situation, although admittedly rare, where the unit tests pass, the full name fails.

So I'm inclined not to have the check here or in the Materialized class. Let's see what others think.
\cc @guozhangwang @mjsax @vvcephei @ableegoldman

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that we should check when using AdminClient -- however, I also agree that we should check early if possible. (I don't see an issue with double checking.) Sometimes a name is used as part of a topic name, and we can check for invalid characters here already. Could we add a flag like "usedInTopicName" and do different check accordingly? On the other hand, I don't see a big issue with enforcing the topic naming conventions to processor names, too, even if it's just a processor name and not used as part of a topic name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've thought about this some more and comments from @mjsax and @fhussonnois have convinced me the checking earlier is a good idea as well.

But I still think we should duplicate the current logic and put the name checking logic in the Named class.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To re-start this thread, I also feel like we should have our own check for Named operations to use.

  • we may want to make operation names more restrictive than topic names, for example to prevent collisions with automatically-generated partition names
  • the topic validation throws an exception that mentions the name is "an invalid topic name". This statement is nonsense if I'm naming an operator. We should throw an exception that says it's an "invalid operator name" or similar.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Named.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the KIP Printed should add a Printed.as(final String name) method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck If we add a method Printed.as then currenlty there is not method withFile or withSysOut to set the the outputstream.

I think we should not add the Printed.as method and update the KIP ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, and if I recall correctly we had similar reasons for not adding as to Suppressed.

\cc @guozhangwang @mjsax @vvcephei @ableegoldman

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

If we update the KIP, we should send a follow up email to the VOTE thread summarizing the changes (we can do this after all PRs are merged in case there is more)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

appending KTable source operators with -table-source is not in the KIP, so we'll either need to remove this or update the KIP

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to use some suffix because otherwise we would generate two names, ie, end up with a naming conflict -- problem is, that a KTable results in two processors and we need a name for each.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck actually, the table() method creates one SourceNode and one ProcessorNode. We need a way to differentiate those two nodes. I should update the KIP to mention this particularity.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I get it now. Thanks for clarifying.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

depending on the decision regarding naming sources for builder.table we'll need to update this test.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought; we should have specific tests for Named and NamedInternal

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 13, 2019

Java 8 passed, Java 11 failed results already cleaned up

retest this please

@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch 3 times, most recently from 0321745 to 14fa150 Compare March 19, 2019 16:09
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did one initial pass. Can make another one after #6409 is merged and this PR is rebased.

Comment thread streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't Joined handled in #6409 already?

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The empty constructor is used to build an empty NamedInternal, see NamedInternal.empty()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just call new NamedInternal(null) instead?

@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch 3 times, most recently from 04f343a to 573c770 Compare March 20, 2019 14:56
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

I am personally not convinced that the pattern we use for NamedInternal is the best one. But it's highly subjective. I would just pass some Strings around -- seems easier to me personally.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, that we should check when using AdminClient -- however, I also agree that we should check early if possible. (I don't see an issue with double checking.) Sometimes a name is used as part of a topic name, and we can check for invalid characters here already. Could we add a flag like "usedInTopicName" and do different check accordingly? On the other hand, I don't see a big issue with enforcing the topic naming conventions to processor names, too, even if it's just a processor name and not used as part of a topic name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: e -> fatal

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Named.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am personally a little confused what orElseGenerateWithPrefix means? It's a personal preference, but I don't think it's easy to read. Similar for suffixWithOrElseGet. (Maybe it's just me, being not use to fancy Java8 constructs that are mimicked here...)

Curious to hear what others think.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the names, but I don't have a strong opinion. We still have time to address between now and the final PR though.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Mar 21, 2019

@fhussonnois #6409 is merged, can you rebase this PR?

Also, this PR is very close to merging, can you add the name verification logic into the Named class as we have discussed?

@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch from 573c770 to 3c9f23a Compare March 22, 2019 15:24
@fhussonnois
Copy link
Copy Markdown
Contributor Author

@bbejeck @mjsax This PR has been rebased (following #6409). The Named class now contains a validate method to check the user provided name. This method is also used by Materialized in place of Topic.validate. I've also made some changes on the class Topic to get access to some static field/method.

I think we should (for now) enforce the topic naming conventions to processor name because they are used for naming some metrics too.

I also add simple tests for Named and NamedInternal

thanks for the review.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated PR @fhussonnois!

This looks good to me if we can address the last final comments we can get this PR merged.

Comment thread clients/src/main/java/org/apache/kafka/common/internals/Topic.java Outdated
@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch from 3c9f23a to 0296f3b Compare March 24, 2019 22:26
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this test the title says shouldUseSpecifiedNameForGlobalTableSourceProcessor but it's asserting the names of state-stores. But we can fix this in one of the following PRs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should fix right away -- otherwise it might slip.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Latest updates LGTM. There are a few additional issues, but we should fix them in the next PR.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for rebasing and sorry to the delay in reviewing. Some more follow ups. Mostly nits

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Consumed.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not to align with the JavaDoc. I think passing in null should be fine.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Named.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Printed.java Outdated
Comment thread streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change documented in the KIP?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KIP has been completed.

@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch from 0296f3b to 50b706b Compare April 11, 2019 09:09
Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) :
 - update existing classes Consumed/Grouped/Printed/Produced to implement NamedOperation
 - introduce new public class Named
@fhussonnois fhussonnois force-pushed the KAFKA-6958-SUB-TASK2 branch from 50b706b to 6836d5c Compare April 18, 2019 08:31
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 18, 2019

Java 8 failed with known flaky test kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup, Java 11 passed

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 18, 2019

Failed with Execution failed for task ':core:compileTestScala'. but a PR #6603 has been merged

retest this please

@bbejeck bbejeck merged commit 075b368 into apache:trunk Apr 18, 2019
@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 18, 2019

Merged #6410 into trunk

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 18, 2019

Thank you @fhussonnois!

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
)

This is the 2nd PR for the KIP-307
Reviewers: Matthias J. Sax <mjsax@apache.org>,  John Roesler <john@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
@mjsax mjsax added the kip Requires or implements a KIP label Jun 12, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants