Skip to content

KAFKA-9127: don't create StreamThreads for global-only topology#8540

Merged
vvcephei merged 11 commits intoapache:trunkfrom
ableegoldman:9127-dont-create-streamthreads-for-global-topology
Apr 29, 2020
Merged

KAFKA-9127: don't create StreamThreads for global-only topology#8540
vvcephei merged 11 commits intoapache:trunkfrom
ableegoldman:9127-dont-create-streamthreads-for-global-topology

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

@ableegoldman ableegoldman commented Apr 23, 2020

This should fix the flaky GlobalKTableIntegrationTest.shouldRestoreGlobalInMemoryKTableOnRestart as well as address the issue on the ticket (avoid unnecessary group coordination overhead).

The deeper issue (and root cause of the test's flakiness) is that currently every StreamThread in a global-only topology will hit IllegalStateException upon trying to poll. Once they all died, the KafkaStreams instance goes into the ERROR state despite the global thread being alive and well.

The fix is to check whether the topology is global-only and overwrite the num.threads to 0 in that case. This PR also adds a check for topologies with nothing whatsoever, and throws an exception to fail fast.

edit: turns out this patch also happens to fix a regression introduced in 2.5, when we switched over to using collection subscription (for which an empty subscription is illegal) from the previous pattern subscription (for which an empty subscription is mysteriously considered fine). This should be cherrypicked back to 2.5

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ableegoldman ,

Thanks for this fix! It looks right to me. I just had one request for the test.

Thanks,
-John

Comment on lines 168 to 169
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a method to getBuilderWithSource() and just call it inline instead of creating a new mutable field for all the tests to rely on?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, done

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, @ableegoldman !

@mjsax mjsax added streams tests Test fixes (including flaky tests) and removed tests Test fixes (including flaky tests) labels Apr 24, 2020
Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice fix! Couple of questions/comments.

Comment thread checkstyle/suppressions.xml Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to add this exception? How much work would it be to reduce the complexity of KafkaStreams?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPathComplexity is a tough one to work around. We'd wind up having to move some blocks of logic to separate helper classes.

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to do this? I understand that and empty topology does not make sense, and it would be appropriate to log a WARN -- but do we need/want to reject it?

Also, should we instead throw an InvalidTopologyException? Furthermore, should we add a similar check to StreamsBuilder.builder() to raise this error even earlier (we would still nee this check though).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I can't personally imagine any reason to ever want an app running with an empty topology, and would prefer to be notified immediately since I presumably did something wrong. But if you feel strongly about allowing this I can demote this to a warning

Copy link
Copy Markdown
Member Author

@ableegoldman ableegoldman Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, do we have an InvalidTopologyException or similar exception already? Or were you proposing to add a new type
edit: Nevermind it's just TopologyException, found it

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change to reject an empty topology make our testing rather "ugly"... Do we really need to reject it?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is getBuilderWithSource really that much uglier than new StreamsBuilder? :P

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a class-level final StreamsBuilder builder = new StreamsBuilder() that we add a source to in the setUp be any. better iyo?

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei Apr 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the suggestion was more along the lines of not throwing an exception while building an empty topology.

I'm not sure. It seems kind of nice to find out right away that your program will do absolutely nothing. I'm not totally sure you could really run an empty topology. Can you subscribe a Consumer to "no topics"?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize he wasn't just complaining about the name, but I was trying to keep that discussion in one thread. But I guess you can only do so much to keep PR chatter oriented in one place

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But @vvcephei 's last question gets right to the heart of the matter. The answer being " technically yes, but it will crash if you try to poll for said nothing, so really no"
That's why the test was flaky, and the reason for this PR in the first place (Avoiding group overhead is the name of the ticket, but the reality is it will only happen once before all the StreamThreads die due to polling no topics)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification.

Comment thread streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java Outdated
Comment thread streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java Outdated
@ableegoldman ableegoldman force-pushed the 9127-dont-create-streamthreads-for-global-topology branch from 60bad98 to f1702f1 Compare April 27, 2020 23:12
}

@Test
public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied this over to trunk and verified that it fails consistently

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

1 similar comment
@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

retest this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

3 similar comments
@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

retest this please

@ableegoldman
Copy link
Copy Markdown
Member Author

One unrelated failure: MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete

@vvcephei vvcephei merged commit b5de449 into apache:trunk Apr 29, 2020
@vvcephei
Copy link
Copy Markdown
Contributor

Cherry-pick for 2.5 in progress...

vvcephei pushed a commit that referenced this pull request Apr 29, 2020
Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
@vvcephei
Copy link
Copy Markdown
Contributor

Cherry-picked to 2.5 as 9e2785f

ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
…/master`

* apache-github/trunk: (45 commits)
  MINOR: Fix broken JMX link in docs by adding missing starting double quote (apache#8587)
  KAFKA-9652: Fix throttle metric in RequestChannel and request log due to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
  KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses (apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
  KAFKA-9932: Don't load configs from ZK when the log has already been loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
  KAFKA-9127: don't create StreamThreads for global-only topology (apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
  KAFKA-9823: Follow-up, check state for handling commit error response (apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
  MINOR: Fix partition numbering from 0 to P-1 instead of P in docs (apache#8572)
  KAFKA-9921: disable caching on stores configured to retain duplicates (apache#8564)
  Minor: remove redundant check in auto preferred leader election (apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
  MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters. (apache#7982)
  MINOR: document how to escape json parameters to ducktape tests (apache#8546)
  ...
ijuma added a commit to confluentinc/kafka that referenced this pull request Apr 30, 2020
There was a minor conflict in gradle.properties because the default
Scala version changed upstream to Scala 2.13. I kept the upstream
change.

Related to this, I have updated Jenkinsfile to compile and validate
with Scala 2.12 in a separate stage so that we ensure we maintain
compatibility. Unlike Apache Kafka, we only run the tests with the
default Scala version, which is now 2.13.

* apache-github/trunk: (45 commits)
MINOR: Fix broken JMX link in docs by adding missing starting double
quote (apache#8587)
KAFKA-9652: Fix throttle metric in RequestChannel and request log due
to KIP-219 (apache#8567)
  KAFKA-9922: Update demo instructions in examples README (apache#8559)
KAFKA-9830: Implement AutoCloseable in ErrorReporter and subclasses
(apache#8442)
  KAFKA-9875: Make integration tests more resilient (apache#8578)
KAFKA-9932: Don't load configs from ZK when the log has already been
loaded (apache#8582)
  KAFKA-9925: decorate pseudo-topics with app id (apache#8574)
  KAFKA-9832: fix attempt to commit non-running tasks (apache#8580)
KAFKA-9127: don't create StreamThreads for global-only topology
(apache#8540)
  MINOR: add support for kafka 2.4 and 2.5 to downgrade test
  KAFKA-9176: Retry on getting local stores from KafkaStreams (apache#8568)
KAFKA-9823: Follow-up, check state for handling commit error response
(apache#8548)
  KAFKA-6145: KIP-441: Add TaskAssignor class config (apache#8541)
MINOR: Fix partition numbering from 0 to P-1 instead of P in docs
(apache#8572)
KAFKA-9921: disable caching on stores configured to retain duplicates
(apache#8564)
Minor: remove redundant check in auto preferred leader election
(apache#8566)
  MINOR: Update the link to the Raft paper in docs (apache#8560)
  MINOR: Fix typos in config properties in MM2 test (apache#8561)
MINOR: Improve producer test BufferPoolTest#testCloseNotifyWaiters.
(apache#7982)
MINOR: document how to escape json parameters to ducktape tests
(apache#8546)
  ...
@ableegoldman ableegoldman deleted the 9127-dont-create-streamthreads-for-global-topology branch May 1, 2020 00:28
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 1, 2020

@vvcephei Cherry-picking to 2.4 did not work? There might still be a 2.4.2 release? Or should we not back-port it to 2.4 because in 2.4 one can in fact run an empty topology?

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented May 4, 2020

Thanks @mjsax (and sorry I didn't see your comment before).

The ticket indicates that it was introduced in 2.5 (https://issues.apache.org/jira/browse/KAFKA-9127 , link for convenience).

IIUC, in 2.4, we would needlessly create StreamThreads, but we would still be able to process global-only topologies. It was only in 2.5 that we actually lost the ability to process global-only topologies.

@ableegoldman
Copy link
Copy Markdown
Member Author

@vvcephei yeah the regression was only introduced in 2.5, so we only need to backport it that far. But I actually would agree that we should backport it to 2.4 as well: in a global-only topology you have unnecessary group coordination overhead of any stream threads resulting in unnecessary rebalances, network usage, etc. Of course you can set the number of threads to zero to get around this, but the KafkaStreams state would not reach RUNNING. That's one of the fixes in this PR, which actually is worth backporting

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented May 4, 2020

Thanks for the clarification. I'll see about backporting to 2.4...

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented May 4, 2020

Actually @ableegoldman , the cherry-pick to 2.4 isn't straightforward. Do you mind doing a separate PR for it?

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented May 4, 2020

Or, if it's too much trouble, we can just note on the ticket that it could be backported, but isn't (yet)

ableegoldman added a commit to ableegoldman/kafka that referenced this pull request May 5, 2020
…he#8540)

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 6, 2020

Thanks for back porting @ableegoldman!

vvcephei pushed a commit that referenced this pull request May 8, 2020
…#8616)

Backports: #8540

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
…apache#8616)

Backports: apache#8540

Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <vvcephei@apache.org>
@cemo
Copy link
Copy Markdown

cemo commented May 26, 2020

@mjarvie when this fix will be released?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented May 27, 2020

As you can see from the ticket (https://issues.apache.org/jira/browse/KAFKA-9127) the fix will be included in 2.4.2, 2.5.1 and 2.6.0. There is only a timeline for 2.6.0 atm with a target release date of June 24, 2020: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

@cemo
Copy link
Copy Markdown

cemo commented May 27, 2020

@mjsax I was actually referring 2.5.1. 😇

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants