Skip to content

KAFKA-5488: Add type-safe split() operator#9107

Merged
mjsax merged 22 commits intoapache:trunkfrom
inponomarev:kip-418
Feb 5, 2021
Merged

KAFKA-5488: Add type-safe split() operator#9107
mjsax merged 22 commits intoapache:trunkfrom
inponomarev:kip-418

Conversation

@inponomarev
Copy link
Copy Markdown
Contributor

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@inponomarev
Copy link
Copy Markdown
Contributor Author

inponomarev commented Jul 30, 2020

⚠️ Two differences with KIP specification, discussion needed⚠️

  1. Instead of multiple overloaded variants of Branched.with we now have Branched.withFunction and Branched.withConsumer. This is because of compiler warnings about overloading (Function and Consumer being indistinguishable when supplied as lambdas)

  2. 'Fully covariant' signatures like Consumer<? super KStream<? super K, ? super V>> don't work as expected. Used Consumer<? super KStream<K, V>> instead

@vvcephei vvcephei requested a review from mjsax July 31, 2020 20:53
@vvcephei
Copy link
Copy Markdown
Contributor

Hey @mjsax , do you have time to give this a first pass?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Aug 4, 2020

I'll put it into my backlog. But I am the main reviewer for two other KIPs (216 and 466) that I should review first as they got approve earlier and PRs are open for longer already.

@mjsax mjsax changed the title KAFKA-5488: KIP-418 implementation KAFKA-5488: Add type save branch() operator Dec 23, 2020
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @inponomarev and sorry for the long wait for a review

Many comments are about JavaDocs, so it's mostly small suggestions. A few comments about the code structure are there, too.

Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java Outdated
Comment thread streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit supposed -> used ?

this function -> the provided function

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I am wondering if we should allow to pass in null? Thoughts?

Copy link
Copy Markdown
Contributor Author

@inponomarev inponomarev Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my reply below, where we discuss null consumers: #9107 (comment)

(in short: I agree, I think we shouldn't)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a non-null branch is provided here? (branch -> consumer?)

But I would propose to simplify it, and just use By default (as passing in a non-null consumer should be the "default" usage).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, should we even allow a not-null consumer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call branch((k,v) -> true, branched) instead to just add a predicate and branch? This way, the default branch is nothing special at runtime any longer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default branch should have index 0 (so it will be stable when branches are added or removed), but it should always be checked after all other branches. And when we come to the default branch during message processing, there is actually no need in dereferncing a predicate and calling test... that's why I treat the default branch differently.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's fine both ways. -- The point about the index is a good one that I missed. But would still be doable I guess.

I don't think that there would be any measurable runtime difference if you use a "default predicate" (what we also do in the current implementation) -- the code is just a little "cleaner" as we don't need an extra "if" at the end -- but it's also not the end of the world as the process method is fairly simply anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result -> outputBranches ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it might be better to move this code into a build method that would be called within defaultBranch() / noDefaultBranch() ?

The pattern to pass in empty list that we modify later seems undesirable, and we should first build the list, and than pass them in -- otherwise, we make assumptions how ProcessorParameters and ProcessorGraphNode might be implemented what we should avoid.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I clearly remember that something made me to write it this way, but I have to recall...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to learn about it. -- In general, it's easier to follow the same pattern throughout the code base. It easier to reason about the code that way, and also easier for people to learn the code base.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just saw your other comment: #9107 (comment)

@inponomarev
Copy link
Copy Markdown
Contributor Author

Hi @mjsax, thanks for your thorough revew! I have fixed everything according to your comments, except:

@inponomarev
Copy link
Copy Markdown
Contributor Author

OK @mjsax concerning #9107 (comment) I remembered why it was implemented this way!

The problem is that it is not necessary to invoke defaultBranch() / noDefaultBranch() when we use consumers, like in this simple example (I just added a new unit test for this case):

source.split()
    .branch(isCoffee, Branched.withConsumer(issuer::setCoffeePurchases))
    .branch(isElectronics, Branched.withConsumer(issuer::setElectronicsPurchases));

@mjsax mjsax changed the title KAFKA-5488: Add type save branch() operator KAFKA-5488: Add type-safe branch() operator Dec 29, 2020
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 29, 2020

About the original comment: #9107 (comment)

I am fine with those changes.

About #9107 (comment) -- that is a good point. Thanks for explaining. I guess it's a "philosophical" question if we want to allow this pattern though, or if we want to require that either defaultBranch() or noDefaultBranch() is called? -- I did consider calling branch() like a builder pattern, and the final [noD|d]efaultBranch call is basically build()?

Curious to hear what @vvcephei thinks about it.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 29, 2020

@inponomarev -- Can you also update the docs for Kafka Streams and the 2.8 upgrade guide in this PR.

@inponomarev inponomarev changed the title KAFKA-5488: Add type-safe branch() operator KAFKA-5488: Add type-safe split() operator Dec 29, 2020
@inponomarev
Copy link
Copy Markdown
Contributor Author

Can you also update the docs for Kafka Streams and the 2.8 upgrade guide in this PR.

The documentation had been already updated (see changes in docs/streams/developer-guide/dsl-api.html)

I also modified docs/upgrade.html -- should I add something more here, like code examples?

Another question: CI checks fail because of usage of deprecated branch method in streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala. Since I'm not a Scala user, I have no idea of what should be done here.

Most likely we should deprecate the branch method and add a wrapper for the new split method, but I don't know how to do this correctly.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 29, 2020

To make the build pass, for now, it should be sufficient to just deprecate the method via @nowarn("cat=deprecation") -- But it seems we should update the Scala API, too. If you cannot handle it, we can do a follow up PR.

It seems, we need to add split() to KStream.scala and introduce a new BranchedKStream.scala and Branch.scala classes and maybe some translations from Java Consumer/Function to their Scala variants. But I also don't really now Scala; @vvcephei should nkow better.

@inponomarev
Copy link
Copy Markdown
Contributor Author

As far as I can judge from the name, @nowarn is not for deprecation, but rather for a warning suppression 🤔 apparently we need to mirror the changes in Java KStream interface here. Never wrote anything in Scala before. OK, it's better to wait for @vvcephei !

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Dec 30, 2020

Maybe I miss-understood you question. I thought the build fails because we are using some deprecated method -- for this case, we can make the build pass by suppressing the warning. If you want to deprecate a method in the Scala API, you just add @deprecated similar to Java. -- I guess it makes sense to also deprecate the KStream.scala#branch() method, but suppressing the warning should also make the build pass and we can deprecate this method when we add the new split() method.

@mjsax mjsax added the kip Requires or implements a KIP label Jan 7, 2021
@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Jan 8, 2021

Hey @inponomarev and @mjsax ! I'm glad to see this is moving along.

Regarding #9107 (comment) :

My understanding was that defaultBranch/noDefaultBranch were the terminal operators, in that they close out the context of a BranchedKStream, and you can't add any more branches after one of those methods.

But also, the whole branching construct is an incremental builder like the rest of the Kafka Streams API. In other words, just like this is a valid program:

builder.stream("input")
       .filter(myPredicate)

so would be Ivan's example:

builder.split()
       .branch("myBranch", ...)

What I mean by "incremental builder" is that each time you call a chained method in the DSL, it immediately adds nodes to the program, as opposed to having to call any kind of build() method to actually add stuff to the program. I think there are pros and cons to this design, but it seems more in line with the rest of the DSL not to require the terminal operators.

@inponomarev
Copy link
Copy Markdown
Contributor Author

Hi @vvcephei , thank you for your comment. There's another question that we were unable to solve without you -- see #9107 (comment) from the words 'CI checks fail' and further discussion. Can you clarify, what's expected from KStream.scala ?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 13, 2021

@vvcephei -- hope you are also ok with the proposed changes to the KIP as per the PR description on top: #9107 (comment)

@inponomarev
Copy link
Copy Markdown
Contributor Author

@vvcephei @mjsax I added full Scala wrapper for the new API: split method, BranchedKStream and Branched. Also added Scala unit tests that verify main use cases

@vvcephei
Copy link
Copy Markdown
Contributor

Hey @inponomarev , I just took a look at the Scala API. Thanks for adding that!

I figured it'd be just easier to push a few tweaks than to describe what needs to be done.

  • You asked me offline if we could avoid the overloads in Branched, and indeed, we can with a default argument of null for the name.
  • The Scala test was inadvertently using the Java Branched class, but you meant to test the Scala one.
  • I happened to notice a small typo: The opposite of "prefix" is "suffix", not "postfix"
  • I also noticed that your files all had CRLF (windows) return characters, so I fixed them. You might want to configure git for autocrlf (git config --global core.autocrlf true) (see https://www.git-scm.com/book/en/v2/Customizing-Git-Git-Configuration)

These are all separate commits above, so you can scrutinize each one. This PR is your work, so feel free to protest any of my suggestions.

@inponomarev
Copy link
Copy Markdown
Contributor Author

Hi @vvcephei thank you for your commits! Is everything else OK, especially #9107 (comment)?

@mjsax I pushed small fixes to Javadoc/Scaladoc, and AFAICS only tests not related to the changes are failing.

@vvcephei
Copy link
Copy Markdown
Contributor

Thanks @inponomarev ,

Ah, I didn't notice that method signature name. I actually prefer it this way :)

Thanks also for pointing out the covariance change. This is also fine. Java's type system only contains a partial implementation of variance, so we do best we can.

Did you already update the KIP? If not, please do.

I'm +1 on this PR.

@inponomarev
Copy link
Copy Markdown
Contributor Author

Thank you @vvcephei, I have updated the KIP and now it reflects the actual implementation.

I just wasn't sure if it's ok to edit specification text after it has been formally approved :-)

@inponomarev
Copy link
Copy Markdown
Contributor Author

Hi @mjsax , I have rebased and manually merged conflicts, and also removed FunctionConverters

JDK8 build still fails, but this time much later -- something related to integration testing

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 3, 2021

Wait failure do you see exactly? Seem Jenkins in still running.

@inponomarev
Copy link
Copy Markdown
Contributor Author

inponomarev commented Feb 3, 2021

I was talking about build 17 (triggered by Commit db573f5, see https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9107/)

Where did build 18 come from, why did it take 8 hours and then timed out -- I can't understand 😃

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 3, 2021

Ah I see -- well, we do have from flaky tests, so nothing to worry about I guess. The last run timed out, so I retriggered the build. However, I could build it locally with Java8/Scale 2.12 and so I guess we can merge. Just waiting for @vvcephei to take a quick look at the last Scala commit.

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Feb 4, 2021

Thanks, all, that Scala fix looks perfect to me.

@mjsax mjsax merged commit 5552da3 into apache:trunk Feb 5, 2021
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 5, 2021

Merged to trunk.

Congrats for getting this into the 2.8.0 release @inponomarev -- great work!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants