Skip to content

MINOR: Fix streams Scala peek recursive call#5566

Merged
guozhangwang merged 4 commits intoapache:trunkfrom
tedyu:trunk
Aug 29, 2018
Merged

MINOR: Fix streams Scala peek recursive call#5566
guozhangwang merged 4 commits intoapache:trunkfrom
tedyu:trunk

Conversation

@tedyu
Copy link
Copy Markdown
Contributor

@tedyu tedyu commented Aug 24, 2018

This PR fixes the previously recursive call of Streams Scala peek

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

testDriver.close()
}

"peek a KStream" should "side effect records" in {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test name is a bit confusing to me: peek should not have side effect on records right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this test we are creating a side effect since we modify an external var.
Maybe it should be side effect on records?
Anyway if we change the name here we should probably change on foreach too.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the name is a bit confusing: I thought it meant that peek / foreach can modify the records themselves, which is not correct. If it is meant for "peek / foreach can have side effects on external variables", any operators like map / filter can have that side effect as well.

Copy link
Copy Markdown
Contributor

@joan38 joan38 Aug 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu What about: should run foreach actions on records?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tedyu @joan38 that looks fine to me.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @vvcephei to take another look.

acc shouldBe "value1value2"

testDriver.close()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do something like this:

  "peek a KStream" should "side effect records" in {
    val builder = new StreamsBuilder()
    val sourceTopic = "source"
    val sinkTopic = "sink"

    var acc = ""
    builder.stream[String, String](sourceTopic).peek((_, v) => acc += v).to(sinkTopic)

    val testDriver = createTestDriver(builder)

    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
    acc shouldBe "value1"
    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value1"
    
    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
    acc shouldBe "value1value2"
    testDriver.readRecord[String, String](sinkTopic).value shouldBe "value2"

    testDriver.close()
  }

@@ -91,6 +91,22 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver {
testDriver.close()
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do the change on this one too please? I forgot to test in between.

    testDriver.pipeRecord(sourceTopic, ("1", "value1"))
    acc shouldBe "value1"
    
    testDriver.pipeRecord(sourceTopic, ("2", "value2"))
    acc shouldBe "value1value2"

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 24, 2018

Minor comments, LGTM 👍

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 24, 2018

Thanks for the review, @joan38

I have addressed your comments

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the naming, lgtm.

testDriver.close()
}

"peek a KStream" should "side effect records" in {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the name is a bit confusing: I thought it meant that peek / foreach can modify the records themselves, which is not correct. If it is meant for "peek / foreach can have side effects on external variables", any operators like map / filter can have that side effect as well.

@mjsax mjsax added the streams label Aug 24, 2018
val testDriver = createTestDriver(builder)

testDriver.pipeRecord(sourceTopic, ("1", "value1"))
acc shouldBe "value1"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my OCD but could you add a new line after this so that we see it's different steps of the test?

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 24, 2018

@tedyu Just wanted to make sure you saw my comment above #5566 (comment)

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 24, 2018

Are you referring to comment about side effect ?

I saw it.

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 24, 2018

Yes. WDYT? I'm really bad a naming/phrasing...

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 24, 2018

I would wait for @guozhangwang to comment.

I think the current form should be fine. But I am open to refinement.

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 27, 2018

@guozhangwang @joan38
Is there anything else I should do for this PR ?

thanks

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 27, 2018

I think we are all good on this one.
Thanks @tedyu

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It LGTM as well. Thanks @tedyu

/cc @guozhangwang

@tedyu
Copy link
Copy Markdown
Contributor Author

tedyu commented Aug 28, 2018

Ping @guozhangwang

@joan38
Copy link
Copy Markdown
Contributor

joan38 commented Aug 28, 2018

The most satisfying part of our work summarised in one word:
screen shot 2018-08-28 at 17 31 14

@guozhangwang
Copy link
Copy Markdown
Contributor

Ping @guozhangwang

Sorry for the delay, I was OOO yesterday.

@guozhangwang guozhangwang merged commit 34d9ae6 into apache:trunk Aug 29, 2018
@vvcephei
Copy link
Copy Markdown
Contributor

@guozhangwang This needs to be cherry picked to 2.0 as well

@guozhangwang
Copy link
Copy Markdown
Contributor

Yup! Done.

guozhangwang pushed a commit that referenced this pull request Aug 29, 2018
This PR fixes the previously recursive call of Streams Scala peek

Reviewers: Joan Goyeau <joan@goyeau.com>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
@vvcephei
Copy link
Copy Markdown
Contributor

Thanks!

@paambaati
Copy link
Copy Markdown

If you don't mind me asking, what is the release/publish plan for this?

@vvcephei
Copy link
Copy Markdown
Contributor

It will be in 2.0.1, which it looks like hasn't been planned yet: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
This PR fixes the previously recursive call of Streams Scala peek

Reviewers: Joan Goyeau <joan@goyeau.com>, Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants