Skip to content
Closed
24 changes: 18 additions & 6 deletions clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import java.io.Closeable;
import java.net.InetAddress;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -93,12 +95,22 @@ public static List<InetSocketAddress> parseAndValidateAddresses(List<String> url

public static void closeQuietly(Closeable c, String name, AtomicReference<Throwable> firstException) {
if (c != null) {
try {
c.close();
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close " + name, t);
}
closeQuietly((Runnable) () -> {
try {
c.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, name, firstException);
}
}

public static void closeQuietly(Runnable closeFn, String name, AtomicReference<Throwable> firstException) {
try {
closeFn.run();
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close " + name, t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public FetchSessionHandler(LogContext logContext, int node) {
this.node = node;
}

public int sessionId() {
Comment thread
ijuma marked this conversation as resolved.
Outdated
return nextMetadata.sessionId();
}

/**
* All of the partitions which exist in the fetch request session.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2134,14 +2134,23 @@ private ClusterResourceListeners configureClusterResourceListeners(Deserializer<
private void close(long timeoutMs, boolean swallowException) {
log.trace("Closing the Kafka consumer");
AtomicReference<Throwable> firstException = new AtomicReference<>();
try {
if (coordinator != null)
coordinator.close(time.timer(Math.min(timeoutMs, requestTimeoutMs)));
} catch (Throwable t) {
firstException.compareAndSet(null, t);
log.error("Failed to close coordinator", t);

// at most requestTimeoutMs since all we do to close is one request
Timer coordinatorCloseTimer = time.timer(Math.min(requestTimeoutMs, timeoutMs));
if (coordinator != null)
ClientUtils.closeQuietly((Runnable) () -> coordinator.close(coordinatorCloseTimer), "consumer coordinator", firstException);
if (fetcher != null) {
long remainingMs;
if (timeoutMs > requestTimeoutMs)
remainingMs = timeoutMs - coordinatorCloseTimer.elapsedMs();
else
remainingMs = coordinatorCloseTimer.remainingMs();

long fetcherTimeoutMs = Math.min(requestTimeoutMs, remainingMs);
ClientUtils.closeQuietly((Runnable) () -> fetcher.close(time.timer(fetcherTimeoutMs)),
"consumer fetcher", firstException);
}
ClientUtils.closeQuietly(fetcher, "fetcher", firstException);

ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchMetadata;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
Expand Down Expand Up @@ -1516,11 +1517,33 @@ private static String partitionLeadMetricName(TopicPartition tp) {

}

@Override
public void close() {
private void closeFetchSessions(Timer timer) {
Cluster cluster = metadata.fetch();

for (Map.Entry<Integer, FetchSessionHandler> entry : sessionHandlers.entrySet()) {
Integer nodeId = entry.getKey();
int sessionId = entry.getValue().sessionId();
FetchMetadata closeSessionMetadata = new FetchMetadata(sessionId, FetchMetadata.FINAL_EPOCH);

final FetchRequest.Builder closeSessionRequest = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, Collections.emptyMap())
.metadata(closeSessionMetadata);
client.send(cluster.nodeById(nodeId), closeSessionRequest);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to do something with the futures returned by send? How do we know if the requests completed? Also, cc @hachikuji.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had discussed offline that there is no need to wait and ensure the requests completed. It makes sense to make it best effort, otherwise it can slow down client termination and I assume people prefer the client terminates quicker rather than make sure no unused sessions are in the broker

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always document things like that so that people reading the code are aware of it. My point was more subtle though, how do we make sure we actually sent the request?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More specifically, there's no point in having this if we close the network client before we even get to the point where the request has reached the network layer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I did not know that was a possibility. I'll dive in.
Fair point with the documentation

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably makes sense to wait a short time, but not very long, to send the request to the broker. I'm not sure how that interacts with the rest of the client close logic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good catch @ijuma. This is the code for closing the KafkaConsumer:

ClientUtils.closeQuietly(fetcher, "fetcher", firstException);
ClientUtils.closeQuietly(interceptors, "consumer interceptors", firstException);
ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
ClientUtils.closeQuietly(client, "consumer network client", firstException);
ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException);
ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException);

Meaning there's a pretty big chance the client is closed before the request are sent.

But then I read the Javadoc of #send()

Send a new request. Note that the request is not actually transmitted on the
     * network until one of the {@link #poll(long)} variants is invoked. At this
     * point the request will either be transmitted successfully or will fail.
     * Use the returned future to obtain the result of the send. Note that there is no
     * need to check for disconnects explicitly on the {@link ClientResponse} object;
     * instead, the future will be failed with a {@link DisconnectException}.

Meaning this code doesn't transmit any requests since poll is not called at all...

I see two approaches to this

  1. Call poll(Long timeout) with the request.timeout.ms configured timeout after the for loop. This will give all requests a grand total of timeout time to be sent. This call will block for a total maximum of request.timeout.ms (default is 30 seconds which seems high to me for this use case). Maybe a total timeout of N seconds, where N is the number of requests?
  2. Call poll((RequestFuture<?> future, long timeout) individually for each sent close request. This blocks on each call, so it's questionable what the timeout should be (1 second?)

In both cases, we will slow down the closing of the consumer for a bit, as we will have to block as far as I can tell.

As a sidenote - this code would also need to handle exceptions as it currently doesn't. My idea is to only catch & log the error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmccabe @ijuma what do you believe is the proper approach?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer close code needs to block until the fetch sessions are closed. KafkaConsumer has close() and close(long timeout, TimeUnit timeUnit). In the case of the latter one, we don't want to wait longer than the specified time.

}
client.poll(timer);
}

public void close(Timer timer) {
if (nextInLineRecords != null)
nextInLineRecords.drain();
decompressionBufferSupplier.close();
closeFetchSessions(timer);
sessionHandlers.clear();
}

@Override
public void close() {
close(time.timer(Long.MAX_VALUE));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchMetadata;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
Expand Down Expand Up @@ -309,6 +310,26 @@ public void testClearBufferedDataForTopicPartitions() {
assertFalse(fetcher.hasCompletedFetches());
}

@Test
public void testFetcherCloseClosesFetchSessionsInBroker() throws Exception {
subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
FetchResponse<MemoryRecords> fetchResponse = fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0);
client.prepareResponse(fetchResponse);
fetcher.sendFetches();
consumerClient.poll(time.timer(0));
assertEquals(0, consumerClient.pendingRequestCount(node));
this.fetcher.close(); // should send request to close the session

assertEquals(1, consumerClient.pendingRequestCount(node));
assertEquals(1, client.inFlightRequestCount());
assertEquals(1, consumerClient.pendingRequestCount(node));
FetchRequest sessionCloseReq = (FetchRequest) client.requests().peek().requestBuilder().build();
assertEquals(fetchResponse.sessionId(), sessionCloseReq.metadata().sessionId());
assertTrue(sessionCloseReq.fetchData().isEmpty());
assertEquals(FetchMetadata.FINAL_EPOCH, sessionCloseReq.metadata().epoch()); // final epoch indicates we want to close the session
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that there are no partitions contained in the request?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we should match the fetch session with the earlier one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely. Done

}

@Test
public void testFetchSkipsBlackedOutNodes() {
subscriptions.assignFromUser(singleton(tp0));
Expand Down
16 changes: 10 additions & 6 deletions tests/kafkatest/utils/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,29 @@ def _kafka_jar_versions(proc_string):
- kafka-streams-1.0.0-SNAPSHOT.jar
- kafka-streams-0.11.0.0-SNAPSHOT.jar
"""
def match(regex_pattern):
# return the result from the first regex group
return [result[0] if isinstance(result, tuple) else result
for result in re.findall(regex_pattern, proc_string)]

# Pattern example: kafka_2.11-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string)
versions = re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string)
versions = match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)")

# Pattern example: kafka_2.11-0.11.0.0-SNAPSHOT.jar
versions.extend(re.findall("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
versions.extend(match("kafka_[0-9]+\.[0-9]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)"))

# Pattern example: kafka-1.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly, we have to be careful not to
# partially match the 4 segment version)
versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+)/", proc_string))
versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+)/"))

# Pattern example: kafka-0.11.0.0/bin/../libs/* (i.e. the JARs are not listed explicitly)
versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
versions.extend(match("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)"))

# Pattern example: kafka-streams-1.0.0-SNAPSHOT.jar (we have to be careful not to partially match the 4 segment version string)
versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+)[\.-][a-zA-z]", proc_string))
versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)"))

# Pattern example: kafka-streams-0.11.0.0-SNAPSHOT.jar
versions.extend(re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
versions.extend(match("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(-[A-Z]+)?)"))

return set(versions)

Expand Down