From f34703cef240f00b00c2725fafb42ed795e2e429 Mon Sep 17 00:00:00 2001 From: Walker Carlson Date: Mon, 15 Mar 2021 12:06:54 -0700 Subject: [PATCH] fix:timeout issue --- .../src/main/java/org/apache/kafka/streams/KafkaStreams.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 77eebe8ad4cdf..798f9e7907e00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1031,7 +1031,6 @@ public Optional removeStreamThread(final Duration timeout) { } private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { - final long begin = time.milliseconds(); boolean timeout = false; if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { @@ -1044,7 +1043,7 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout streamThread.requestLeaveGroupDuringShutdown(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { + if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; // Don't remove from threads until shutdown is complete. We will trim it from the @@ -1066,7 +1065,7 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); try { - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs - begin, TimeUnit.MILLISECONDS); + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs, TimeUnit.MILLISECONDS); } catch (final java.util.concurrent.TimeoutException e) { log.error("Could not remove static member {} from consumer group {} due to a timeout: {}", groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);