Skip to content

KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics#6265

Merged
mjsax merged 5 commits intoapache:trunkfrom
bbejeck:KAFKA-7758_resuse_KGroupStreamWithNamedRepartitionTopics
Feb 17, 2019
Merged

KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics#6265
mjsax merged 5 commits intoapache:trunkfrom
bbejeck:KAFKA-7758_resuse_KGroupStreamWithNamedRepartitionTopics

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Feb 13, 2019

This PR adds support for re-using a KGroupedStream or `KGroupedTable object after executing an aggregation operation with a named repartition topic.

KGroupedStream example

final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(as("grouping"));
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count().toStream().to("output-one");
kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count().toStream().to("output-two");

KGroupedTable example

final KGroupedTable<String, String> kGroupedTable = builder.<String, String>table("topic").groupBy(KeyValue::pair, Grouped.as("grouping"));
 kGroupedTable.count().toStream().to("output-count");
 kGroupedTable.reduce((v, v2) -> v2, (v, v2) -> v2).toStream().to("output-reduce");

This approach will not cause any compatibility issues for two reasons

  1. Aggregations requiring repartitioning without naming the repartition topic maintain the same topology structure, which is the default mode today. So by not reusing the repartition graph node, the numbering and structure of the topology remains the same.
  2. Aggregations where the repartition topic is named, it is not possible at the moment to re-use either the KGroupedStream or KGroupedTable object as Kafka Streams throws an InvalidTopologyException when building the topology. Hence you can't even deploy the application.

I've added unit tests for each case and ran our existing suite of streams tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first invocation, the repartitionNode is always null, so we'll build the repartition node and set the variable. But on subsequent calls, only if the user has provided a name for the repartition topic will we re-use the repartition graph node.

This maintains compatibility as users not naming the repartition topic will get the same topology and re-using the KGroupedStream results in an InvalidToplogyException when building the topology

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and re-using the KGroupedStream results in an InvalidToplogyException when building the topology

I thought, if there is no user topic-name, old code would create multiple repartition topics? And re-using KGroupedStream only throughs if there is a user topic-name (and this restriction is lifted with this PR)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a hard time reasoning about the or condition and what exactly happens when there is a name / isn't a name / repartition isn't required at all.

Can we consider breaking these up into three separate cases to make the code more linear?

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Feb 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought, if there is no user topic-name, old code would create multiple repartition topics? And re-using KGroupedStream only throughs if there is a user topic-name (and this restriction is lifted with this PR)

That's correct. I may not have been clear above, but what I meant is that this change won't break compatibility with users currently not providing a repartition topic name as it will create multiple repartition topics thus keep their topology the same. Does that make sense?

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Feb 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider breaking these up into three separate cases to make the code more linear?

Repartitons are required when the repartitionRequired flag is true as passed in from the parent KStream instance https://github.com/apache/kafka/pull/6265/files#diff-2585c8864b9121454af6d88adba60975R81. A name for the repartition topic is always required; it's just a matter of using a generated one or the user provided one.

So we are left with just two cases:

  1. On the first call the to GroupedStreamAggregateBuilder#build() method the reparitionNode will always be null, so we need to create it, regardless if the user has provided a reparation topic name or not.
  2. The other case is when we make subsequent GroupedStreamAggregateBuilder#build() calls to the same GroupedStreamAggregateBuilder instance; then the name does come into play. If the user has provided a repartition topic name, we would reuse the same repartition node. Otherwise, it's null, so we know to create a new repartition node.

I originally had this:

if (repartitionNode == null) {
    repartitionNode = repartitionNodeBuilder.build();
} else if (userProvidedRepartitionTopicName == null) {
    repartitionNode = repartitionNodeBuilder.build();
}

But I felt that having multiple lines with the same variable assignment did not make sense. What do think about me adding my explanation here to line 86 as comments?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, comments would help. I think the logic is correct, but it's just a little complex to reason about. Some stuff I came to realize only by poking around the code:

  1. there is a 1:1 relationship between a GroupedStreamAggregateBuilder instance and a KGroupedStreamImpl instance (i.e., we construct the builder in the constructor of the grouped stream node), so when we cache a repartition node in the builder, it's exactly like caching it in the KGroupedStream node, which is intuitively what we wanted to do.
  2. If repartition is not required, we don't build a repartition node at all, but just bypass it in the graph.
  3. Although we cache the repartition node in an instance field, we actually only use it if the node is user-named. If the name is auto-generated, we overwrite the repartition node every time before adding it to the graph.

2 and 3 were basically what I had in mind as separate branches. Something like:

  • if repartition not required, just connect the agg node directly to the source node
  • if repartition is required and not user-named, make a brand-new repartition node, and hook it up to the source node and hook the agg node up to the repartition node
  • if the repartition is required and user-named, then check if there's already a repartition node (if not, create one, hook it up to the source node, and cache it), then hook the agg node up to the repartition node

This is just to clarify my comment above. If you prefer to clarify the flow with the explanation you gave, that's fine as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I may not have been clear above, but what I meant is that this change won't break compatibility with users currently not providing a repartition topic name as it will create multiple repartition topics thus keep their topology the same. Does that make sense?

Ack. Just wanted to make sure we are on the same page :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 and 3 were basically what I had in mind as separate branches.

We already have this

  1. Line 84 is the guard for checking if a repartition is not required, if not then we do skip all the repartition node creation steps and just add the agg node to the source node.
  2. If a repartition is required we drop into the block and make the necessary node creation and graph links.

I'll add a comment on the boolean statement just to clarify what's going on.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not in-line this method as I feel it helps with the readability of the code.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it? It's a one liner calling createRepartitionedSource, so what does we gain by calling createRepartitionSource instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's personal preference if you insist I can inline it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't insist -- would prefer though (for this case) -- in general, this pattern can be useful -- just don't see it for this particular case.

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Feb 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the first invocation, the repartitionNode is always null, so we'll build the repartition node and set the variable. But on subsequent calls, only if the user has provided a name for the repartition topic will we re-use the repartition graph node.

This maintains compatibility as users not naming the repartition topic will get the same topology and re-using the KGroupedTable results in an InvalidToplogyException when building the topology

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 13, 2019

@ableegoldman, @guozhangwang, @mjsax, @vvcephei for reviews

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 14, 2019

Java 11 failure unrelated org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testProduceConsumeConnector

retest this please

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM -- just couple of nits.

One related by orthogonal question: what happens if a TimeWindowedKStream or SessionWindowedKStream is reused?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and re-using the KGroupedStream results in an InvalidToplogyException when building the topology

I thought, if there is no user topic-name, old code would create multiple repartition topics? And re-using KGroupedStream only throughs if there is a user topic-name (and this restriction is lifted with this PR)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use static import to shorten line length?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it? It's a one liner calling createRepartitionedSource, so what does we gain by calling createRepartitionSource instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for renaming! Much better!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break long line into multiple

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: break long line

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove one empty line

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I just left one comment about readability, but I think it's also fine if you prefer it the way it is.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this javadoc still accurate?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code, but did we want the assert on L73 to execute all the time, rather than just when java is executed with -ea?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure. \cc @guozhangwang

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a hard time reasoning about the or condition and what exactly happens when there is a name / isn't a name / repartition isn't required at all.

Can we consider breaking these up into three separate cases to make the code more linear?

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 15, 2019

One related by orthogonal question: what happens if a TimeWindowedKStream or SessionWindowedKStream is reused?

Good question! When calling either KGroupedStream.windowedBy(Windows<K>) or KGroupedStreams.windowedBy(SessionWindows) the GroupedStreamAggregateBuilder instance is passed along to the corresponding TimeWindowedKStream or SessionWindowedKStream instance, so the same behavior demonstrated with resuing KGroupedStream with named repartition topics will occur. I'll add a unit test case for though just for completeness.

@bbejeck bbejeck force-pushed the KAFKA-7758_resuse_KGroupStreamWithNamedRepartitionTopics branch from befacb6 to f09bfe1 Compare February 15, 2019 15:55
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 15, 2019

updated this per comments and rebased with trunk

@mjsax @vvcephei

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

I don't see any issue with the logic (and don't think comments are necessary -- also, it might be better to refactor code before adding comments, so the code documents itself.) Leave it up to you if you want to address John's comment.

If you refactor the code, I can also have a look again.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 16, 2019

Java 11 had an unrelated failure kafka.api.SaslSslAdminClientIntegrationTest.testMinimumRequestTimeouts and timed out
Java 8 had and unrelated failure kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup and timed out as well

retest this please

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Feb 16, 2019

both Java 11 and Java 8 timed out

retest this please

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 16, 2019

Java11 passed.
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/19519/

java.util.concurrent.ExecutionException: Boxed Error
	at scala.concurrent.impl.Promise$.resolver(Promise.scala:59)
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:51)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
	at scala.concurrent.Promise$class.complete(Promise.scala:55)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:157)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:23)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: Received 0, expected at least 68
	at org.junit.Assert.fail(Assert.java:89)
	at org.junit.Assert.assertTrue(Assert.java:42)
	at kafka.api.ConsumerBounceTest.kafka$api$ConsumerBounceTest$$receiveAndCommit(ConsumerBounceTest.scala:562)
	at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$1$$anonfun$1.apply$mcV$sp(ConsumerBounceTest.scala:347)
	at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$1$$anonfun$1.apply(ConsumerBounceTest.scala:346)
	at kafka.api.ConsumerBounceTest$$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$mcVI$sp$1$$anonfun$1.apply(ConsumerBounceTest.scala:346)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
	... 7 more

and

java.lang.AssertionError: Condition not met within timeout 15000. Streams never started.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
	at org.apache.kafka.streams.KafkaStreamsTest.shouldThrowOnCleanupWhileRunning(KafkaStreamsTest.java:556)

and

java.lang.AssertionError: Condition not met within timeout 15000. Streams never started.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:365)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:325)
	at org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:255)

Retest this please.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 17, 2019

This time Java8 passed.

https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2422/

14:19:51 kafka.api.SaslSslAdminClientIntegrationTest > testMinimumRequestTimeouts FAILED
14:19:51     java.lang.AssertionError: Expected an exception of type org.apache.kafka.common.errors.TimeoutException; got type org.apache.kafka.common.errors.SslAuthenticationException

Thus, we got one green build each. Merging to trunk.

@mjsax mjsax merged commit 247ccd7 into apache:trunk Feb 17, 2019
mjsax pushed a commit that referenced this pull request Feb 17, 2019
… topics (#6265)

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Feb 17, 2019

Merged to trunk and cherry-picked to 2.2.

@bbejeck bbejeck deleted the KAFKA-7758_resuse_KGroupStreamWithNamedRepartitionTopics branch February 18, 2019 01:09
jarekr pushed a commit to confluentinc/kafka that referenced this pull request Apr 18, 2019
* ak/trunk: (45 commits)
  KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756)
  MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269)
  KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274)
  KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278)
  KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object  (apache#4586)
  MINOR: add log indicating the suppression time (apache#6260)
  MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279)
  KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265)
  KAFKA-7884; Docs for message.format.version should display valid values (apache#6209)
  MINOR: Save failed test output to build output directory
  MINOR: add test for StreamsSmokeTestDriver (apache#6231)
  MINOR: Fix bugs identified by compiler warnings (apache#6258)
  KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433)
  MINOR: fix bypasses in ChangeLogging stores (apache#6266)
  MINOR: Make MockClient#poll() more thread-safe (apache#5942)
  MINOR: drop dbAccessor reference on close (apache#6254)
  KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119)
  KAFKA-7916: Unify store wrapping code for clarity (apache#6255)
  MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand
  KAFKA-7921: log at error level for missing source topic (apache#6262)
  ...
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
… topics (apache#6265)

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants