diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 77eebe8ad4cdf..798f9e7907e00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -1031,7 +1031,6 @@ public Optional removeStreamThread(final Duration timeout) { } private Optional removeStreamThread(final long timeoutMs) throws TimeoutException { - final long begin = time.milliseconds(); boolean timeout = false; if (isRunningOrRebalancing()) { synchronized (changeThreadCount) { @@ -1044,7 +1043,7 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout streamThread.requestLeaveGroupDuringShutdown(); streamThread.shutdown(); if (!streamThread.getName().equals(Thread.currentThread().getName())) { - if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs - begin)) { + if (!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) { log.warn("Thread " + streamThread.getName() + " did not shutdown in the allotted time"); timeout = true; // Don't remove from threads until shutdown is complete. We will trim it from the @@ -1066,7 +1065,7 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); try { - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs - begin, TimeUnit.MILLISECONDS); + removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(timeoutMs, TimeUnit.MILLISECONDS); } catch (final java.util.concurrent.TimeoutException e) { log.error("Could not remove static member {} from consumer group {} due to a timeout: {}", groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e);