diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 4d933243655ab..97aade02b5473 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -29,6 +29,8 @@ import java.io.Closeable; import java.net.InetAddress; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -93,12 +95,22 @@ public static List parseAndValidateAddresses(List url public static void closeQuietly(Closeable c, String name, AtomicReference firstException) { if (c != null) { - try { - c.close(); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close " + name, t); - } + closeQuietly((Runnable) () -> { + try { + c.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, name, firstException); + } + } + + public static void closeQuietly(Runnable closeFn, String name, AtomicReference firstException) { + try { + closeFn.run(); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close " + name, t); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 30ae65f2309c4..7c131671924e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -67,6 +67,10 @@ public FetchSessionHandler(LogContext logContext, int node) { this.node = node; } + public int sessionId() { + return nextMetadata.sessionId(); + } + /** * All of the partitions which exist in the fetch request session. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5c673a58c10f5..bb79f320a81a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2134,14 +2134,23 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< private void close(long timeoutMs, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); + + // at most requestTimeoutMs since all we do to close is one request + Timer coordinatorCloseTimer = time.timer(Math.min(requestTimeoutMs, timeoutMs)); + if (coordinator != null) + ClientUtils.closeQuietly((Runnable) () -> coordinator.close(coordinatorCloseTimer), "consumer coordinator", firstException); + if (fetcher != null) { + long remainingMs; + if (timeoutMs > requestTimeoutMs) + remainingMs = timeoutMs - coordinatorCloseTimer.elapsedMs(); + else + remainingMs = coordinatorCloseTimer.remainingMs(); + + long fetcherTimeoutMs = Math.min(requestTimeoutMs, remainingMs); + ClientUtils.closeQuietly((Runnable) () -> fetcher.close(time.timer(fetcherTimeoutMs)), + "consumer fetcher", firstException); } - ClientUtils.closeQuietly(fetcher, "fetcher", firstException); + ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 265fc99721d11..e0ed56744c9fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.IsolationLevel; @@ -1516,11 +1517,33 @@ private static String partitionLeadMetricName(TopicPartition tp) { } - @Override - public void close() { + private void closeFetchSessions(Timer timer) { + Cluster cluster = metadata.fetch(); + + for (Map.Entry entry : sessionHandlers.entrySet()) { + Integer nodeId = entry.getKey(); + int sessionId = entry.getValue().sessionId(); + FetchMetadata closeSessionMetadata = new FetchMetadata(sessionId, FetchMetadata.FINAL_EPOCH); + + final FetchRequest.Builder closeSessionRequest = FetchRequest.Builder + .forConsumer(this.maxWaitMs, this.minBytes, Collections.emptyMap()) + .metadata(closeSessionMetadata); + client.send(cluster.nodeById(nodeId), closeSessionRequest); + } + client.poll(timer); + } + + public void close(Timer timer) { if (nextInLineRecords != null) nextInLineRecords.drain(); decompressionBufferSupplier.close(); + closeFetchSessions(timer); + sessionHandlers.clear(); + } + + @Override + public void close() { + close(time.timer(Long.MAX_VALUE)); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 52b78e3de626e..bcf86bad3108a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -64,6 +64,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -309,6 +310,26 @@ public void testClearBufferedDataForTopicPartitions() { assertFalse(fetcher.hasCompletedFetches()); } + @Test + public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception { + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + FetchResponse fetchResponse = fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0); + client.prepareResponse(fetchResponse); + fetcher.sendFetches(); + consumerClient.poll(time.timer(0)); + assertEquals(0, consumerClient.pendingRequestCount(node)); + this.fetcher.close(); // should send request to close the session + + assertEquals(1, consumerClient.pendingRequestCount(node)); + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, consumerClient.pendingRequestCount(node)); + FetchRequest sessionCloseReq = (FetchRequest) client.requests().peek().requestBuilder().build(); + assertEquals(fetchResponse.sessionId(), sessionCloseReq.metadata().sessionId()); + assertTrue(sessionCloseReq.fetchData().isEmpty()); + assertEquals(FetchMetadata.FINAL_EPOCH, sessionCloseReq.metadata().epoch()); // final epoch indicates we want to close the session + } + @Test public void testFetchSkipsBlackedOutNodes() { subscriptions.assignFromUser(singleton(tp0)); diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index dd2027331dd70..e391ee9c1c511 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -33,25 +33,29 @@ def _kafka_jar_versions(proc_string): - kafka-streams-1.0.0-SNAPSHOT.jar - kafka-streams-0.11.0.0-SNAPSHOT.jar """ + def match(regex_pattern): + # return the result from the first regex group + return [result[0] if isinstance(result, tuple) else result + for result in re.findall(regex_pattern, proc_string)] # Pattern example: kafka_2.11-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string) - versions = re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string) + versions = match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)") # Pattern example: kafka_2.11-0.11.0.0-SNAPSHOT.jar - versions.extend(re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) # Pattern example: kafka-1.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly, we have to be careful not to # partially match the 4 segment version) - versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+)/", proc_string)) + versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+)/")) # Pattern example: kafka-0.11.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly) - versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)")) # Pattern example: kafka-streams-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string) - versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string)) + versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) # Pattern example: kafka-streams-0.11.0.0-SNAPSHOT.jar - versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) return set(versions)