Skip to content

Count fix and Type alias refactor in Streams Scala API#4966

Merged
guozhangwang merged 1 commit intoapache:trunkfrom
joan38:materialized
May 11, 2018
Merged

Count fix and Type alias refactor in Streams Scala API#4966
guozhangwang merged 1 commit intoapache:trunkfrom
joan38:materialized

Conversation

@joan38
Copy link
Copy Markdown
Contributor

@joan38 joan38 commented May 3, 2018

Provide a tiny helper for creating Materialized with implicit Serdes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented May 8, 2018

@guozhangwang @debasishg Any thoughts on this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ByteArrayKeyValueStore is more succinct than KeyValueStore[Bytes, Array[Byte]]? cc @debasishg @seglo for a second thought.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 @guozhangwang .. exactly for this reason the type synonym was introduced

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm good with the type alias ByteArrayKeyValueStore but then we should do it consistently for the other types like SessionStore[Bytes, Array[Byte]]. I will then modify the PR to add the others.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go defensive with implicits on Materialized. First, this abstraction doesn't add much value since it returns the Java abstraction. Also if someone doesn't wish to pass a serde to a Materialized, and use Materialized.as they can be inadvertently passed if implicits are in scope. This will lead to errors that are difficult to debug.

@seglo
Copy link
Copy Markdown
Member

seglo commented May 9, 2018

I don't believe there's much purpose to abstract Materialized to help with implicit SerDes now that the internal changes to propagate the SerDes for each of these aggregation operators has been completed in #4919. As @guozhangwang mentioned in that PR:

This is a meta explanation about the serde inheritance across multiple internal impl classes:

reduce: inherit the key and value serdes from the parent XXImpl class.
count: inherit the key serdes, enforce setting the Serdes.Long() for value serdes.
aggregate: inherit the key serdes, do not set for value serdes internally (line 92 here is for this case).

@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented May 9, 2018

Indeed this doesn't bring too much value so I removed the add of the Materialized helper and left only some refactorings

@joan38 joan38 changed the title Add Materialized helper with implicit Serdes in Kafka Streams Scala Minor type alias refactor May 9, 2018
@joan38 joan38 force-pushed the materialized branch 2 times, most recently from 43aa5bb to 41cac50 Compare May 9, 2018 18:53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious why we can save the casting of Long2long now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why it's been done in the first place

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to double check if Scala 2.11 is compatible with java / scala Long (I'm not an expert in this case..)

If that is indeed the case, could the above count() overloaded function be updated as well?

Copy link
Copy Markdown
Contributor Author

@joan38 joan38 May 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be any difference between 2.12 and 2.11 here. Both are just doing boxing and unboxing in Scala long.

I'll write a test to but that on clear tomorrow.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this change is faulty. I reverted this and added a test to make sure we don't get in this in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the other overload def count(): KTable[K, Long] doesn't compile without the mapValues. This tricked us into thinking that we need to do this for this overload as well. Possibly the type parameter Long in Materialized helps the compiler to infer the return type of Long and make the conversion. So we still need to do the conversion manually in the other overload of count.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other parts LGTM. Left two more minor comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revert the change here as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed we should

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here.

@guozhangwang guozhangwang changed the title Minor type alias refactor MINOR: Type alias refactor in Streams Scala API May 11, 2018
* @see `org.apache.kafka.streams.kstream.KGroupedTable#count`
*/
def count(materialized: Materialized[K, Long, ByteArrayKeyValueStore]): KTable[K, Long] =
inner.count(materialized)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one would have triggered a StackOverflow looping on itself.

* @see `org.apache.kafka.streams.kstream.SessionWindowedKStream#count`
*/
def count(materialized: Materialized[K, Long, SessionStore[Bytes, Array[Byte]]]): KTable[Windowed[K], Long] =
inner.count(materialized)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one would have triggered a StackOverflow looping on itself.

@joan38 joan38 changed the title MINOR: Type alias refactor in Streams Scala API Count fix and Type alias refactor in Streams Scala API May 11, 2018
@guozhangwang guozhangwang merged commit 40d191b into apache:trunk May 11, 2018
@guozhangwang
Copy link
Copy Markdown
Contributor

Thanks @joan38 ! Merged to trunk.

@joan38 joan38 deleted the materialized branch May 11, 2018 17:25
@joan38
Copy link
Copy Markdown
Contributor Author

joan38 commented May 11, 2018

Thanks @guozhangwang

ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…#4966)

Reviewers: Debasish Ghosh <dghosh@acm.org>, Guozhang Wang <guozhang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants