Skip to content

KAFKA-7316 Use of filter method in KTable.scala may result in StackOverflowError#5543

Closed
tedyu wants to merge 3 commits intoapache:trunkfrom
tedyu:trunk
Closed

KAFKA-7316 Use of filter method in KTable.scala may result in StackOverflowError#5543
tedyu wants to merge 3 commits intoapache:trunkfrom
tedyu:trunk

Conversation

@tedyu
Copy link
Copy Markdown
Contributor

@tedyu tedyu commented Aug 21, 2018

Due to lack of conversion to kstream Predicate, existing filter method in KTable.scala would result in StackOverflowError.

This PR fixes the bug and adds calls in StreamToTableJoinScalaIntegrationTestImplicitSerdes.testShouldCountClicksPerRegion to prevent regression.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 21, 2018

@mjsax @guozhangwang
Please take a look.

@vvcephei
Copy link
Copy Markdown
Contributor

See also: #5538

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 21, 2018

Hi @tedyu,

Thanks for submitting the PR. As @vvcephei stated I submitted 2 PRs about this #5538 and #5539 with the test cases based on another issue we had in #5543.
I didn't catch the peek one tho.

Thanks

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 21, 2018

Once your PR gets merged, I will adjust mine keeping changes to FunctionConversions.scala, etc which fix peek.

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 21, 2018

@tedyu sounds good to me.
We are waiting on #5543 to get merged first.

*/
def foreach(action: (K, V) => Unit): Unit =
inner.foreach((k: K, v: V) => action(k, v))
inner.foreach(((k: K, v: V) => action(k, v)).asForeachAction)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can become action.asForeachAction

*/
def peek(action: (K, V) => Unit): KStream[K, V] =
inner.peek((k: K, v: V) => action(k, v))
inner.peek(((k: K, v: V) => action(k, v)).asForeachAction)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

action.asForeachAction

}
userRegionsTable.filterNot { (_, _) =>
false
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once #5543 gets merged, we can implement a proper test a bit like b61cf73#diff-def535526870ef4e9f8b5dcd83b812d5R32.

@mjsax mjsax added the streams label Aug 21, 2018
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Aug 21, 2018

@joan38 Can you update your PR title to reference the JIRA number as they address the same issue. Thx.

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 21, 2018

@mjsax done

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 21, 2018

Should I create another JIRA (for peek) since @joan38 has fixed what KAFKA-7316 describes ?

@tedyu tedyu closed this Aug 21, 2018
@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 21, 2018

@tedyu Did you just abandoned this PR or you are going to open a new one for peek? Do you want me to open it?

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 21, 2018

After your PR gets merged, I will create a new one for peek.

Thanks

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 23, 2018

Followed test example from #5539 for testing peek.
Once #5539 is merged, I will create PR for peek.

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 24, 2018

Created #5566 for fixing peek

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants