From f8df762f628197bb4c3a8176a2b5a277f219b031 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Mon, 26 Nov 2018 13:33:05 +0000 Subject: [PATCH 1/6] kafkatest: Match and compare `*-SNAPSHOT` versions in KafkaVersionTest --- tests/kafkatest/utils/util.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py index dd2027331dd70..e391ee9c1c511 100644 --- a/tests/kafkatest/utils/util.py +++ b/tests/kafkatest/utils/util.py @@ -33,25 +33,29 @@ def _kafka_jar_versions(proc_string): - kafka-streams-1.0.0-SNAPSHOT.jar - kafka-streams-0.11.0.0-SNAPSHOT.jar """ + def match(regex_pattern): + # return the result from the first regex group + return [result[0] if isinstance(result, tuple) else result + for result in re.findall(regex_pattern, proc_string)] # Pattern example: kafka_2.11-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string) - versions = re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string) + versions = match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)") # Pattern example: kafka_2.11-0.11.0.0-SNAPSHOT.jar - versions.extend(re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) # Pattern example: kafka-1.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly, we have to be careful not to # partially match the 4 segment version) - versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+)/", proc_string)) + versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+)/")) # Pattern example: kafka-0.11.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly) - versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)")) # Pattern example: kafka-streams-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string) - versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string)) + versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) # Pattern example: kafka-streams-0.11.0.0-SNAPSHOT.jar - versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")) return set(versions) From 86baa713a3aa921c99b6a113623cd76ed7c72eae Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Fri, 20 Jul 2018 10:44:25 -0700 Subject: [PATCH 2/6] KAFKA-7109: Close cached fetch sessions in the broker on consumer close Previously, the consumer's incremental fetch sessions would time out once the consumer was gone. --- .../kafka/clients/FetchSessionHandler.java | 4 ++++ .../clients/consumer/internals/Fetcher.java | 17 +++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java index 30ae65f2309c4..7c131671924e7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java +++ b/clients/src/main/java/org/apache/kafka/clients/FetchSessionHandler.java @@ -67,6 +67,10 @@ public FetchSessionHandler(LogContext logContext, int node) { this.node = node; } + public int sessionId() { + return nextMetadata.sessionId(); + } + /** * All of the partitions which exist in the fetch request session. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 265fc99721d11..728556d4abb13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.IsolationLevel; @@ -1516,11 +1517,27 @@ private static String partitionLeadMetricName(TopicPartition tp) { } + private void closeFetchSessions() { + Cluster cluster = metadata.fetch(); + + for (Map.Entry entry : sessionHandlers.entrySet()) { + Integer nodeId = entry.getKey(); + int sessionId = entry.getValue().sessionId(); + FetchMetadata closeSessionMetadata = new FetchMetadata(sessionId, FetchMetadata.FINAL_EPOCH); + + final FetchRequest.Builder closeSessionRequest = FetchRequest.Builder + .forConsumer(this.maxWaitMs, this.minBytes, Collections.emptyMap()) + .metadata(closeSessionMetadata); + client.send(cluster.nodeById(nodeId), closeSessionRequest); + } + } + @Override public void close() { if (nextInLineRecords != null) nextInLineRecords.drain(); decompressionBufferSupplier.close(); + closeFetchSessions(); } } From ee577ba5cb311391be5e4cf7fdccaf0544c59cfa Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Wed, 25 Jul 2018 11:24:00 -0700 Subject: [PATCH 3/6] KAFKA-7109: Add test for closing fetch sessions in broker --- .../clients/consumer/internals/Fetcher.java | 1 + .../consumer/internals/FetcherTest.java | 20 +++++++++++++++++++ 2 files changed, 21 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 728556d4abb13..0b78bc81f8e5c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1538,6 +1538,7 @@ public void close() { nextInLineRecords.drain(); decompressionBufferSupplier.close(); closeFetchSessions(); + sessionHandlers.clear(); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 52b78e3de626e..38e07936ca16a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -64,6 +64,7 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.FetchMetadata; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.FetchRequest; @@ -309,6 +310,25 @@ public void testClearBufferedDataForTopicPartitions() { assertFalse(fetcher.hasCompletedFetches()); } + @Test + public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception { + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); + client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + fetcher.sendFetches(); + consumerClient.poll(time.timer(0)); + assertEquals(0, consumerClient.pendingRequestCount(node)); + this.fetcher.close(); // should send request to close the session + + assertEquals(1, consumerClient.pendingRequestCount(node)); + assertEquals(0, client.inFlightRequestCount()); + consumerClient.poll(time.timer(0)); + assertEquals(1, client.inFlightRequestCount()); + assertEquals(1, consumerClient.pendingRequestCount(node)); + FetchRequest sessionCloseReq = (FetchRequest) client.requests().peek().requestBuilder().build(); + assertEquals(FetchMetadata.FINAL_EPOCH, sessionCloseReq.metadata().epoch()); // final epoch indicates we want to close the session + } + @Test public void testFetchSkipsBlackedOutNodes() { subscriptions.assignFromUser(singleton(tp0)); From dfa222f4bb18ad287064daab9b600c3f1b10f8f7 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Fri, 27 Jul 2018 22:28:52 +0100 Subject: [PATCH 4/6] KAFKA-7109: Improve fetcher session handler close test --- .../apache/kafka/clients/consumer/internals/FetcherTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 38e07936ca16a..57cb812769a3e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -314,7 +314,8 @@ public void testClearBufferedDataForTopicPartitions() { public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception { subscriptions.assignFromUser(singleton(tp0)); subscriptions.seek(tp0, 0); - client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + FetchResponse fetchResponse = fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0); + client.prepareResponse(fetchResponse); fetcher.sendFetches(); consumerClient.poll(time.timer(0)); assertEquals(0, consumerClient.pendingRequestCount(node)); @@ -326,6 +327,8 @@ public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception { assertEquals(1, client.inFlightRequestCount()); assertEquals(1, consumerClient.pendingRequestCount(node)); FetchRequest sessionCloseReq = (FetchRequest) client.requests().peek().requestBuilder().build(); + assertEquals(fetchResponse.sessionId(), sessionCloseReq.metadata().sessionId()); + assertTrue(sessionCloseReq.fetchData().isEmpty()); assertEquals(FetchMetadata.FINAL_EPOCH, sessionCloseReq.metadata().epoch()); // final epoch indicates we want to close the session } From 94417ace3e682fa8e45616e682d603739bc4df13 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 11 Oct 2018 13:22:13 +0100 Subject: [PATCH 5/6] KAFKA-7109: Add timeout for closing fetching sessions --- .../kafka/clients/consumer/KafkaConsumer.java | 11 ++++++++++- .../clients/consumer/internals/Fetcher.java | 16 ++++++++++++---- .../clients/consumer/internals/FetcherTest.java | 2 -- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 5c673a58c10f5..9c890b5ceb577 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2134,6 +2134,8 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer< private void close(long timeoutMs, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); + + long closeStart = time.milliseconds(); try { if (coordinator != null) coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); @@ -2141,7 +2143,14 @@ private void close(long timeoutMs, boolean swallowException) { firstException.compareAndSet(null, t); log.error("Failed to close coordinator", t); } - ClientUtils.closeQuietly(fetcher, "fetcher", firstException); + long closeTimeoutLeft = timeoutMs - (time.milliseconds() - closeStart); + try { + if (fetcher != null) + fetcher.close(closeTimeoutLeft); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close Fetcher", t); + } ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 0b78bc81f8e5c..283675ff06fc1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1517,7 +1517,7 @@ private static String partitionLeadMetricName(TopicPartition tp) { } - private void closeFetchSessions() { + private void closeFetchSessions(long timeoutMs) { Cluster cluster = metadata.fetch(); for (Map.Entry entry : sessionHandlers.entrySet()) { @@ -1530,15 +1530,23 @@ private void closeFetchSessions() { .metadata(closeSessionMetadata); client.send(cluster.nodeById(nodeId), closeSessionRequest); } + client.poll(timeoutMs); } - @Override - public void close() { + /** + * @param timeoutMs the maximum amount of time (in milliseconds) this method can block for + */ + public void close(long timeoutMs) { if (nextInLineRecords != null) nextInLineRecords.drain(); decompressionBufferSupplier.close(); - closeFetchSessions(); + closeFetchSessions(timeoutMs); sessionHandlers.clear(); } + @Override + public void close() { + close(Long.MAX_VALUE); + } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 57cb812769a3e..bcf86bad3108a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -322,8 +322,6 @@ public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception { this.fetcher.close(); // should send request to close the session assertEquals(1, consumerClient.pendingRequestCount(node)); - assertEquals(0, client.inFlightRequestCount()); - consumerClient.poll(time.timer(0)); assertEquals(1, client.inFlightRequestCount()); assertEquals(1, consumerClient.pendingRequestCount(node)); FetchRequest sessionCloseReq = (FetchRequest) client.requests().peek().requestBuilder().build(); From dc33ae043136e122c71dece7598d89a91b0c2ada Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Sun, 14 Oct 2018 18:45:37 +0300 Subject: [PATCH 6/6] Use Timer class for timeouts --- .../org/apache/kafka/clients/ClientUtils.java | 24 +++++++++++---- .../kafka/clients/consumer/KafkaConsumer.java | 30 +++++++++---------- .../clients/consumer/internals/Fetcher.java | 13 ++++---- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 4d933243655ab..97aade02b5473 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -29,6 +29,8 @@ import java.io.Closeable; import java.net.InetAddress; +import java.io.IOException; +import java.io.UncheckedIOException; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -93,12 +95,22 @@ public static List parseAndValidateAddresses(List url public static void closeQuietly(Closeable c, String name, AtomicReference firstException) { if (c != null) { - try { - c.close(); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close " + name, t); - } + closeQuietly((Runnable) () -> { + try { + c.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }, name, firstException); + } + } + + public static void closeQuietly(Runnable closeFn, String name, AtomicReference firstException) { + try { + closeFn.run(); + } catch (Throwable t) { + firstException.compareAndSet(null, t); + log.error("Failed to close " + name, t); } } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9c890b5ceb577..bb79f320a81a0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -2135,22 +2135,22 @@ private void close(long timeoutMs, boolean swallowException) { log.trace("Closing the Kafka consumer"); AtomicReference firstException = new AtomicReference<>(); - long closeStart = time.milliseconds(); - try { - if (coordinator != null) - coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs))); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close coordinator", t); - } - long closeTimeoutLeft = timeoutMs - (time.milliseconds() - closeStart); - try { - if (fetcher != null) - fetcher.close(closeTimeoutLeft); - } catch (Throwable t) { - firstException.compareAndSet(null, t); - log.error("Failed to close Fetcher", t); + // at most requestTimeoutMs since all we do to close is one request + Timer coordinatorCloseTimer = time.timer(Math.min(requestTimeoutMs, timeoutMs)); + if (coordinator != null) + ClientUtils.closeQuietly((Runnable) () -> coordinator.close(coordinatorCloseTimer), "consumer coordinator", firstException); + if (fetcher != null) { + long remainingMs; + if (timeoutMs > requestTimeoutMs) + remainingMs = timeoutMs - coordinatorCloseTimer.elapsedMs(); + else + remainingMs = coordinatorCloseTimer.remainingMs(); + + long fetcherTimeoutMs = Math.min(requestTimeoutMs, remainingMs); + ClientUtils.closeQuietly((Runnable) () -> fetcher.close(time.timer(fetcherTimeoutMs)), + "consumer fetcher", firstException); } + ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException); ClientUtils.closeQuietly(metrics, "consumer metrics", firstException); ClientUtils.closeQuietly(client, "consumer network client", firstException); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 283675ff06fc1..e0ed56744c9fb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -1517,7 +1517,7 @@ private static String partitionLeadMetricName(TopicPartition tp) { } - private void closeFetchSessions(long timeoutMs) { + private void closeFetchSessions(Timer timer) { Cluster cluster = metadata.fetch(); for (Map.Entry entry : sessionHandlers.entrySet()) { @@ -1530,23 +1530,20 @@ private void closeFetchSessions(long timeoutMs) { .metadata(closeSessionMetadata); client.send(cluster.nodeById(nodeId), closeSessionRequest); } - client.poll(timeoutMs); + client.poll(timer); } - /** - * @param timeoutMs the maximum amount of time (in milliseconds) this method can block for - */ - public void close(long timeoutMs) { + public void close(Timer timer) { if (nextInLineRecords != null) nextInLineRecords.drain(); decompressionBufferSupplier.close(); - closeFetchSessions(timeoutMs); + closeFetchSessions(timer); sessionHandlers.clear(); } @Override public void close() { - close(Long.MAX_VALUE); + close(time.timer(Long.MAX_VALUE)); } }