Skip to content

KAFKA-7671: Stream-Global Table join should not reset repartition flag#5959

Merged
guozhangwang merged 1 commit intoapache:trunkfrom
bbejeck:KAFKA-7671_stream_globalKTable_join_should_not_reset_repartition_flag
Nov 29, 2018
Merged

KAFKA-7671: Stream-Global Table join should not reset repartition flag#5959
guozhangwang merged 1 commit intoapache:trunkfrom
bbejeck:KAFKA-7671_stream_globalKTable_join_should_not_reset_repartition_flag

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Nov 28, 2018

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Nov 28, 2018

ping @guozhangwang, @mjsax, and @vvcephei for reviews.

@mjsax mjsax added the streams label Nov 28, 2018
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@vvcephei
Copy link
Copy Markdown
Contributor

LGTM also. Thanks, Bill!

The test failure was unrelated:

kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts

Error Message
org.junit.runners.model.TestTimedOutException: test timed out after 120000 milliseconds
Stacktrace
org.junit.runners.model.TestTimedOutException: test timed out after 120000 milliseconds
	at java.lang.Object.wait(Native Method)
	at java.lang.Object.wait(Object.java:502)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:92)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
	at kafka.utils.TestUtils$.assertFutureExceptionTypeEquals(TestUtils.scala:1418)
	at kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts(AdminClientIntegrationTest.scala:1068)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)
Standard Output
[2018-11-28 16:11:28,376] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition topic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2018-11-28 16:11:28,376] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition topic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2018-11-28 16:11:43,118] ERROR [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] Error for partition topic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2018-11-28 16:11:43,118] ERROR [ReplicaFetcher replicaId=0, leaderId=2, fetcherId=0] Error for partition topic-0 at offset 0 (kafka.server.ReplicaFetcherThread:76)
org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.
[2018-11-28 16:11:52,975] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-8': (org.apache.kafka.common.utils.KafkaThread:51)
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:631)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:562)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:468)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1125)
	at java.lang.Thread.run(Thread.java:748)

... maybe the runner ran out of memory?

Retest this, please.

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM!

@guozhangwang guozhangwang merged commit 2c305dc into apache:trunk Nov 29, 2018
guozhangwang pushed a commit that referenced this pull request Nov 29, 2018
#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Nov 29, 2018
#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
guozhangwang pushed a commit that referenced this pull request Nov 29, 2018
#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.1 / 2.0 / 1.1 as well. I did not cherry-pick to older versions since the code base has been diverged a lot (there's already quite some conflicts to resolve at 1.1).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 29, 2018

The fix itself should be a one liner. Maybe just back-port this without the test? Thoughts?

pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
apache#5959)

This PR fixes an issue reported from a user. When we join a KStream with a GlobalKTable we should not reset the repartition flag as the stream may have previously changed its key, and the resulting stream could be used in an aggregation operation or join with another stream which may require a repartition for correct results.

I've added a test which fails without the fix.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@bbejeck bbejeck deleted the KAFKA-7671_stream_globalKTable_join_should_not_reset_repartition_flag branch July 10, 2024 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants