Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -92,6 +93,7 @@
*/
public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final Logger log;
private final LogContext logContext;
private final ConsumerNetworkClient client;
private final Time time;
private final int minBytes;
Expand All @@ -110,6 +112,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
private final ExtendedDeserializer<K> keyDeserializer;
private final ExtendedDeserializer<V> valueDeserializer;
private final IsolationLevel isolationLevel;
private final Map<Integer, FetchSessionHandler> sessionHandlers;

private PartitionRecords nextInLineRecords = null;

Expand All @@ -131,6 +134,7 @@ public Fetcher(LogContext logContext,
long retryBackoffMs,
IsolationLevel isolationLevel) {
this.log = logContext.logger(Fetcher.class);
this.logContext = logContext;
this.time = time;
this.client = client;
this.metadata = metadata;
Expand All @@ -147,6 +151,7 @@ public Fetcher(LogContext logContext,
this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
this.retryBackoffMs = retryBackoffMs;
this.isolationLevel = isolationLevel;
this.sessionHandlers = new HashMap<>();

subscriptions.addListener(this);
}
Expand Down Expand Up @@ -181,36 +186,37 @@ public boolean hasCompletedFetches() {
return !completedFetches.isEmpty();
}

private boolean matchesRequestedPartitions(FetchRequest.Builder request, FetchResponse response) {
Set<TopicPartition> requestedPartitions = request.fetchData().keySet();
Set<TopicPartition> fetchedPartitions = response.responseData().keySet();
return fetchedPartitions.equals(requestedPartitions);
}

/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
* an in-flight fetch or pending fetch data.
* @return number of fetches sent
*/
public int sendFetches() {
Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
for (Map.Entry<Node, FetchRequest.Builder> fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest.Builder request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();

log.debug("Sending {} fetch for partitions {} to broker {}", isolationLevel, request.fetchData().keySet(),
fetchTarget);
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
final Node fetchTarget = entry.getKey();
final FetchSessionHandler.FetchRequestData data = entry.getValue();
final FetchRequest.Builder request = FetchRequest.Builder.
forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes)
.metadata(data.metadata())
.toForget(data.toForget());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the name can be more explicit? For example, forgetPartitions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted a name that indicated that we want to forget the partitions, but that it hasn't been done yet. I'm open to suggestions, but toForget seemed nice and simple.

if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
client.send(fetchTarget, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = (FetchResponse) resp.responseBody();
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
// is mainly for test cases where mock fetch responses must be manually crafted.
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}", response.responseData().keySet(),
request.fetchData().keySet());
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We lost the comment we had before, but it seemed useful. Maybe you can update it to be relevant to the new logic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will add a log message to FetchSessionHandler which will spell out this information

log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}

Expand All @@ -219,7 +225,7 @@ public void onSuccess(ClientResponse resp) {

for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).fetchOffset;
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
FetchResponse.PartitionData fetchData = entry.getValue();

log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
Expand All @@ -233,7 +239,10 @@ public void onSuccess(ClientResponse resp) {

@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request {} to {} failed", request.fetchData(), fetchTarget, e);
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler != null) {
handler.handleError(e);
}
}
});
}
Expand Down Expand Up @@ -772,42 +781,41 @@ private List<TopicPartition> fetchablePartitions() {
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private Map<Node, FetchRequest.Builder> createFetchRequests() {
private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> fetchable = new LinkedHashMap<>();
Map<Node, FetchSessionHandler.Builder> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (!this.client.hasPendingRequests(node)) {
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
FetchSessionHandler.Builder builder = fetchable.get(node);
if (builder == null) {
FetchSessionHandler handler = sessionHandlers.get(node.id());
if (handler == null) {
handler = new FetchSessionHandler(logContext, node.id());
sessionHandlers.put(node.id(), handler);
}
builder = handler.newBuilder();
fetchable.put(node, builder);
}

long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
this.fetchSize));
builder.add(partition, new FetchRequest.PartitionData(position, FetchRequest.INVALID_LOG_START_OFFSET,
this.fetchSize));
log.debug("Added {} fetch request for partition {} at offset {} to node {}", isolationLevel,
partition, position, node);
partition, position, node);
} else {
log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
}
}

// create the fetches
Map<Node, FetchRequest.Builder> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes,
entry.getValue(), isolationLevel)
.setMaxBytes(this.maxBytes);
requests.put(node, fetch);
Map<Node, FetchSessionHandler.FetchRequestData> reqs = new LinkedHashMap<>();
for (Map.Entry<Node, FetchSessionHandler.Builder> entry : fetchable.entrySet()) {
reqs.put(entry.getKey(), entry.getValue().build());
}
return requests;
return reqs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors;

public class FetchSessionIdNotFoundException extends RetriableException {
private static final long serialVersionUID = 1L;

public FetchSessionIdNotFoundException() {
}

public FetchSessionIdNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors;

public class InvalidFetchSessionEpochException extends RetriableException {
private static final long serialVersionUID = 1L;

public InvalidFetchSessionEpochException() {
}

public InvalidFetchSessionEpochException(String message) {
super(message);
}
}
16 changes: 16 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.common.errors.DelegationTokenExpiredException;
import org.apache.kafka.common.errors.DelegationTokenNotFoundException;
import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException;
import org.apache.kafka.common.errors.FetchSessionIdNotFoundException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
Expand All @@ -38,6 +39,7 @@
import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
import org.apache.kafka.common.errors.InvalidCommitOffsetSizeException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidFetchSessionEpochException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidPartitionsException;
Expand Down Expand Up @@ -608,6 +610,20 @@ public ApiException build(String message) {
public ApiException build(String message) {
return new GroupIdNotFoundException(message);
}
}),
FETCH_SESSION_ID_NOT_FOUND(70, "The fetch session ID was not found",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new FetchSessionIdNotFoundException(message);
}
}),
INVALID_FETCH_SESSION_EPOCH(71, "The fetch session epoch is invalid",
new ApiExceptionBuilder() {
@Override
public ApiException build(String message) {
return new InvalidFetchSessionEpochException(message);
}
});

private interface ApiExceptionBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ public Long getOrElse(Field.Int64 field, long alternative) {
return alternative;
}

public Short getOrElse(Field.Int16 field, short alternative) {
if (hasField(field.name))
return getShort(field.name);
return alternative;
}

public Integer getOrElse(Field.Int32 field, int alternative) {
if (hasField(field.name))
return getInt(field.name);
Expand Down
Loading