Skip to content

KAFKA-7301: Fix streams Scala join ambiguous overload#5502

Merged
guozhangwang merged 1 commit intoapache:trunkfrom
joan38:fix-join
Aug 21, 2018
Merged

KAFKA-7301: Fix streams Scala join ambiguous overload#5502
guozhangwang merged 1 commit intoapache:trunkfrom
joan38:fix-join

Conversation

@joan38
Copy link
Copy Markdown
Contributor

@joan38 joan38 commented Aug 14, 2018

join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
#5019 (comment)

This due to an overload of it with the same signature in the first curried parameter.
See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628

I don't see many options here:

  • Keeping only one join with the Materialized as implicit like we did with aggregate... (this PR currently implements)
  • Renaming the materialized join to something like joinMat.
  • Uncurrying one of the 2 or both join and loosing the type inference.

I will add all the needed tests once we agree on an option.

The current workarounds are:

@debasishg
@ijuma
@guozhangwang
@mowczare

Thanks

@debasishg
Copy link
Copy Markdown
Contributor

@joan38 Can we have some tests for this ? Looks ok to me ..

@mowczare
Copy link
Copy Markdown
Contributor

mowczare commented Aug 14, 2018

It works for me, from my experience I never used any KTable join method without explicit Materialized either way. Just don't forget about the other join methods in the very same KTable.scala file:

  • def leftJoin
  • def outerJoin
    They suffer the same issue, although I didn't mention them in my report (my bad).

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 14, 2018

Good catch!
I was thinking to refactor the tests for a while, I will add that to this PR.
Thanks

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @joan38 ,

Thanks for picking this back up. I have one question...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if I'm being dense, but is it ok for Materialized to be implicit?

It seems to me that this would allow us to elide the parameter as long as there is a Materialized of the correct generic type in context, right?

But the Materialized builders aren't freely interchangeable just based on their generic types. In particular, if they have a name, then it would be incorrect to give the same name to every store in the topology. At first glance this seems like more room for error than I'm comfortable with for implicits.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not being dense.
The default Materialized given as implicit:

implicit def materializedFromSerde[K, V, S <: StateStore](implicit keySerde: Serde[K],

Doesn't specify any name or other configuration for the store. Therfore it would behave the same as if we didn't give one.
In fact we are doing this for aggregate and other transformations.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I agree that's safe. I just had another thought reviewing this, though...

It's not immediately obvious that this would be the case, but join results are not materialized by default. Each side of the join must maintain the state of its stream, but since the final result can always be computed from the left and right stores, we don't need to materialize the join result.

So if I happen to have this implicit in scope, then all the joins in my topology will become materialized. Obviously, this will have some impact on resource usage.

I think that I actually have no way to specify that I don't want a join to be materialized in the current API.

Note that most of the other KTable operators are similar in behavior. For example: table.filter.filter.mapValues.toStream with no materializations actually only needs one state store (for the original table), but if we implicitly materialized them all, you'd wind up storing the data 4 times.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, arguably this is a bug in the Java code, since AFAICT it's pointless to materialize the join if there's no queriable name.

Looking in KTableImpl, the join does this:

// only materialize if specified in Materialized
        if (materializedInternal != null) {
            kTableJoinNodeBuilder.withMaterializedInternal(materializedInternal);
        }

But the filter, for example, does this:

// only materialize if the state store is queryable
        final boolean shouldMaterialize = materializedInternal != null && materializedInternal.isQueryable();

Maybe we should go ahead and fix this logic, and then the implicit would be fine.

But I'm still not sure that it's great to require Materialized as an argument, effectively requiring the implicit as well, or a bunch of dummy arguments in my topology, just so they can be ignored later.

It's a bummer, because needing to support two variants means we can't do the currying approach at all, which means we can't benefit from the improved type inference.

Maybe the way out is just to have join and materializedJoin (and innerJoin and materializedInnerJoin, etc.). I don't know...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang WDYT? Is there some benefit to always requiring (possibly anonymous) Materialized on joins?

I could see it being advantageous to make the serdes available for possible optimizations, but I don't know if this is the right mechanism for that anyway. I.e., you have to pass in something called "materialized", but it actually doesn't materialize anything; just registers serdes -- this seems confusing.

Copy link
Copy Markdown
Contributor Author

@joan38 joan38 Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, if you bring in scope a Materialized that has a state store name configured, then it will be storing the data.

WDYT about a version like this:

def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR)
def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(joiner: (V, VO) => VR)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the aggregators (like in KGroupedTable) take an implicit Materialized, but in those cases, materialization is actually required, even if it's anonymous.

Copy link
Copy Markdown
Contributor Author

@joan38 joan38 Aug 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei The only places where we've expected an implicit materialized: Materialized are in count()s and in aggregate()s (where it actually makes sense to require them implicitly), so I don't see why in your example table.filter.filter.mapValues.toStream it would materialize 4 times?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my comment was ambiguous. I was just saying that such would be the result if we make materialized implicit across the board in KTable.

@vvcephei
Copy link
Copy Markdown
Contributor

And to @mowczare 's point, I think we can add some "api shape" tests. That is, we don't really need to test any of the behavioral aspects of KStreams, but we should write tests that prove the API has the right shape when it's actually used.

For example, we can create a set of mock implementations for the Java API that just send you to the correct next type (like ApiMockKTable#toStream() would do nothing but return new ApiMockKStream()), etc.

Then, we should be able to initialize the scala facades to wrap the mocks and proceed to write any topologies we want.

I think there's not even any need to assert any conditions. We'd just be checking that the desired source code does indeed compile.

What do you think?

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 15, 2018

You are right we can mock the Java API but if we go with static tests (complie time only) then there is no need for a mock.
I will add those tests soon and see if I do one of your 2 options.
Thanks

@vvcephei
Copy link
Copy Markdown
Contributor

Ok! Either way; I was just throwing the idea out there.

@guozhangwang
Copy link
Copy Markdown
Contributor

This option looks good to me too. Since KTable.join() seems not usable in 2.0.0, I think there is no compatibility concerns anyways..

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 15, 2018

@vvcephei That's always welcome 😄

I just pushed a WIP of test implementation.
At the end I went for the TopologyTestDriver since I remember static tests wouldn't have caught some casting issue we had in the passed.

Tomorrow I will finish the test for join so that we can merge this PR. And in a separate PR continue to add testing to cover the rest of the library.

@mjsax mjsax changed the title Fix streams Scala join ambiguous overload KAFKA-7301: Fix streams Scala join ambiguous overload Aug 16, 2018
@joan38 joan38 force-pushed the fix-join branch 3 times, most recently from 905ba28 to b0f8bd6 Compare August 16, 2018 22:23
@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 16, 2018

Please review the added tests.
This is ready to be merged from my side.
I will add more tests in a separate PR in an effort to cover all the wrapper.

@guozhangwang
Copy link
Copy Markdown
Contributor

Jenkins fails due to lacking of license:

22:48:35 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk10-scala2.12/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KStreamTest.scala
22:48:35 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk10-scala2.12/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/KTableTest.scala
22:48:35 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk10-scala2.12/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/StreamToTableJoinTestData.scala
22:48:35 Unknown license: /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk10-scala2.12/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/utils/TestDriver.scala
22:48:35 

@joan38 joan38 force-pushed the fix-join branch 2 times, most recently from e4920eb to c289d39 Compare August 17, 2018 07:58
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it good to put my name here?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this is binary compatible.

@vvcephei
Copy link
Copy Markdown
Contributor

Flaky API tests.

Retest this, please.

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 17, 2018

From @vvcephei's great comment #5502 (comment), it sounds like with this change we may unintentionally start to persist in a state store if it turns out that we have an implicit Materialized with a store name set in scope.
Also the only reason why we make this Materialized implicit is to solve a function overloading problem, so semantically it doesn't sounds right.

So I'm thinking about the following change instead:

def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR)
def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(joiner: (V, VO) => VR)

The drawback is that it introduces a small reordering of parameters compare to the Java API, but I think it's not too disturbing and at least it's semantically correct.

WTYT?

@debasishg
Copy link
Copy Markdown
Contributor

@joan38 In the original implementation we did not have the implicit .. https://github.com/lightbend/kafka-streams-scala/blob/develop/src/main/scala/com/lightbend/kafka/scala/streams/KTableS.scala#L47-L53 .. The API was something like what u have here ..

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 17, 2018

@debasishg Indeed I don't think the implicit is a good idea now to @vvcephei's comments.
But I also think this is not ok for a Scala API to not have full type inference.
This is why I make a suggestion of using something like:

def join[VO, VR](other: KTable[K, VO])(joiner: (V, VO) => VR)
def join[VO, VR](other: KTable[K, VO], materialized: Materialized[K, VR, ByteArrayKeyValueStore])(joiner: (V, VO) => VR)

No implicit and overloading, but with the trade off of reordering the parameters a bit.

@debasishg
Copy link
Copy Markdown
Contributor

@joan38 I agree with the type inference thingy. Previously we did it for compatibility with the Java APIs. But I think we already had this discussion of making a curried argument for better type inference. I was just pointing to the implicit stuff.

@vvcephei
Copy link
Copy Markdown
Contributor

@joan38,

Part of the goal with the multiple arg lists is to make the type system correctly infer the type parameters, right?

If I understand this properly,

  • The this KTable determines K and V (this is fixed already)
  • other determines V0. This gets its own param list so that V0 is fixed for the joiner.
  • joiner determines VR. Putting it in its own param list would fix VR for materialized

Is that right?
So if we move materialized up, we would have to redundantly specify the generics on materialized to match the result type of joiner. It feels more natural to me to have joiner determine the result type, but I don't know if this makes much of a difference.

Also, because of the structure of the Java Materialized builder, it forgets its type parameters part of the time anyway, so it might be moot (i.e., it might be necessary to specify those type parameters even if we keep materialized in its own param list at the end). Unless, of course, we also provide a scala builder for Materialized that includes generic type witnesses. (I've been out of the scala game for a while, so I don't remember whether this would be Manifest, TypeTag, or ClassTag)

This might be an ergonomic improvement anyway.

What are your thoughts?

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 17, 2018

@vvcephei the other solution of joinMaterialized solves this issue but I guess in practice we define Materialized outside of function call:

val materialized = Materialized.as("my-name").withKeySerde(someSerde).withValueSerde(someSerde)
join(otherStream, materialized)((a, b) => a + b)

I don't see much the point of inferring the result type really.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joan38 Thanks for this PR.

Those tests are super clean. I love it!

I had two nitpicks regarding the test names, and one question about the copyright. I'll see if I can dig up someone to answer that last one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: KTables

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: KTables

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the implications of adding these copyright lines, or whether it's proper to remove the prior lines. Maybe there's an armchair OS lawyer out there who can comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Humm that one was not intended, I will revert that.

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 17, 2018

Ok so the first commit is a proposed fix for joins.

Now I'm thinking that the implicit Materialized on count(), reduce() and aggregate() that I introduced in #5066 was not a great idea because it suggests that the user should define an implicit Materialized. Whereas the reason why we put that implicit was to avoid having no serdes set, especially in the aggregate() case, and then count() and reduce() to make the API consistent.
So the second commit is a proposal to revert that and use Serdes instead and form the Materialized from them.
The issue is now that 2.0 is released are we looking to do such a change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you guys understand why this is 3 instead of 2?
The window should ditch the first event no?

Copy link
Copy Markdown
Contributor Author

@joan38 joan38 Aug 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out the advanceWallClockTime is not enough, I need to change the time of the events.

@joan38 joan38 force-pushed the fix-join branch 6 times, most recently from 6e18c3c to fc3d0f1 Compare August 20, 2018 00:47
@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 20, 2018

Doing more testing I discovered that filter on KTable was broken too. It was recursively calling itself.
I'm adding slowly but surely more and more tests.

@vvcephei
Copy link
Copy Markdown
Contributor

Hey @joan38 ,

I hope this isn't too discouraging, but since your prior PR is already released, we need to do a fresh KIP to update the API again.

We're skirting that for the original scope of this PR, since it's actually impossible to use those methods in their current form, so we know that no one is.

However, if I understand correctly, count() and company are actually usable, it's just that we wish the API were slightly different. You can still propose such a change, but I think it should be in a new PR so that we can go ahead and merge this one. (And that new PR wouldn't be mergeable until it has a KIP)

Actually, now that I'm thinking about it, it might be nice to also just create a new PR for KTable#filter. It's just much easier for the reviewers to consider smaller, more focused PRs.

I do very much appreciate these tests and improvements; I'm just trying to smoothe the way for them to get merged.

Thanks!
-John

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 20, 2018

Makes perfect sense.
The PR has been updated.

@vvcephei
Copy link
Copy Markdown
Contributor

Thanks! I'll take another look tomorrow.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang This LGTM now. I guess we also need to cherry pick it to 2.0.

@mjsax mjsax added the streams label Aug 21, 2018
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @joan38 !!

@guozhangwang guozhangwang merged commit dae9c41 into apache:trunk Aug 21, 2018
guozhangwang pushed a commit that referenced this pull request Aug 21, 2018
Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:

This due to an overload of it with the same signature in the first curried parameter.
See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>

minor
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.0 as well, with some minor fixes to resolve conflicts.

@joan38 joan38 deleted the fix-join branch August 23, 2018 09:22
@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 23, 2018

@vvcephei @guozhangwang Could you guys give me KIP creation access?
When I follow the instructions on https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals I get a Not found.
My account: https://cwiki.apache.org/confluence/display/~joan

@guozhangwang
Copy link
Copy Markdown
Contributor

@joan38 I've granted you the access.

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented Aug 23, 2018

Thanks @guozhangwang

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
Join in the Scala streams API is currently unusable in 2.0.0 as reported by @mowczare:
apache#5019 (comment)

This due to an overload of it with the same signature in the first curried parameter.
See compiler issue that didn't catch it: https://issues.scala-lang.org/browse/SI-2628

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants