Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ public ApiKeys apiKey() {
return requestBuilder.apiKey();
}

public RequestHeader makeHeader() {
return new RequestHeader(requestBuilder.apiKey().id,
requestBuilder.version(), clientId, correlationId);
public RequestHeader makeHeader(short version) {
return new RequestHeader(requestBuilder.apiKey().id, version, clientId, correlationId);
}

public AbstractRequest.Builder<?> requestBuilder() {
Expand Down
21 changes: 12 additions & 9 deletions clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -280,43 +280,46 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest request = null;
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
short version;
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
version = builder.desiredOrLatestVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending message of type {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
"Assuming version {}.", clientRequest.apiKey(), nodeId, version);
} else {
short version = versionInfo.usableVersion(clientRequest.apiKey());
builder.setVersion(version);
version = versionInfo.usableVersion(clientRequest.apiKey());
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
request = builder.build();
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException e) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} to {}",
clientRequest.toString(), clientRequest.destination(), e);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
return;
}
RequestHeader header = clientRequest.makeHeader();
}

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The awkwardness is that the request objects themselves are kind of a pointless intermediate stage between the builders and the transformation to the Send (for the client anyway). I guess this will be resolved when the builders go away (in effect, the request classes become the builders). I'd probably be inclined to do it in one shot, but up to you. One challenge is dealing with the fact that the request version must be known when the server receives it, but unknown at the time the client creates it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that the builders should go away. Let's do that in a follow-up PR though. :)

The request version thing isn't an issue because it's in the header field as well, I think.

String nodeId = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id);
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} to node {}.", request, nodeId);
} else {
log.debug("Using older server API v{} to send {} to node {}.",
header.apiVersion(), request, nodeId);
header.apiVersion(), request, nodeId);
}
}
Send send = request.toSend(nodeId, header);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,11 +697,10 @@ private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, Of
if (generation == null)
return RequestFuture.failure(new CommitFailedException());

OffsetCommitRequest.Builder builder =
new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId).
setMemberId(generation.memberId).
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
setGenerationId(generation.generationId).
setMemberId(generation.memberId).
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);

log.trace("Sending OffsetCommit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,9 @@ private void checkDisconnects(long now) {
iterator.remove();
for (ClientRequest request : requestEntry.getValue()) {
RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback();
handler.onComplete(new ClientResponse(request.makeHeader(), request.callback(), request.destination(),
request.createdTimeMs(), now, true, null, null));
handler.onComplete(new ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
request.callback(), request.destination(), request.createdTimeMs(), now, true,
null, null));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ public void onSuccess(ClientResponse resp) {
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
request.version()));
resp.requestHeader().apiVersion()));
}

sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
}

@Override
Expand Down Expand Up @@ -603,13 +603,12 @@ public void onFailure(RuntimeException e) {
* @return A response which can be polled to obtain the corresponding timestamps and offsets.
*/
private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node,
final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
ListOffsetRequest.Builder builder = new ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);

// If we need a timestamp in the response, the minimum RPC version we can send is v1.
// Otherwise, v0 is OK.
builder.setMinVersion(requireTimestamp ? (short) 1 : (short) 0);
final Map<TopicPartition, Long> timestampsToSearch,
boolean requireTimestamp) {
// If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK.
short minVersion = requireTimestamp ? (short) 1 : (short) 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do this here, but as I'm reading this, I wonder why we can't push this logic into the builder. For example, we can add a builder method requireTimestamp(boolean).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like a good idea

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll swap the minVersion parameter of forConsumer with a requireTimestamp parameter.

ListOffsetRequest.Builder builder = ListOffsetRequest.Builder.forConsumer(minVersion)
.setTargetTimes(timestampsToSearch);

log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
return client.send(node, builder)
Expand Down Expand Up @@ -733,7 +732,7 @@ private Map<Node, FetchRequest.Builder> createFetchRequests() {
Map<Node, FetchRequest.Builder> requests = new HashMap<>();
for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
Node node = entry.getKey();
FetchRequest.Builder fetch = new FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, entry.getValue()).
setMaxBytes(this.maxBytes);
requests.put(node, fetch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,40 +25,39 @@
import java.nio.ByteBuffer;

public abstract class AbstractRequest extends AbstractRequestResponse {
private final short version;

public static abstract class Builder<T extends AbstractRequest> {
private final ApiKeys apiKey;
private short version;
private final Short desiredVersion;

public Builder(ApiKeys apiKey) {
this(apiKey, null);
}

public Builder(ApiKeys apiKey, Short desiredVersion) {
this.apiKey = apiKey;
this.version = ProtoUtils.latestVersion(apiKey.id);
this.desiredVersion = desiredVersion;
}

public ApiKeys apiKey() {
return apiKey;
}

public Builder<T> setVersion(short version) {
this.version = version;
return this;
public short desiredOrLatestVersion() {
return desiredVersion == null ? ProtoUtils.latestVersion(apiKey.id) : desiredVersion;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion: maybe we can just add a latestVersion field to ApiKeys and remove all the direct calls to ProtoUtils.latestVersion?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

public short version() {
return version;
public T build() {
return build(desiredOrLatestVersion());
}

public abstract T build();
public abstract T build(short version);
}

public AbstractRequest(Struct struct, short version) {
super(struct);
this.version = version;
}
private final short version;

public Send toSend(String destination, RequestHeader header) {
return new NetworkSend(destination, serialize(header, this));
public AbstractRequest(short version) {
this.version = version;
}

/**
Expand All @@ -68,6 +67,19 @@ public short version() {
return version;
}

public Send toSend(String destination, RequestHeader header) {
return new NetworkSend(destination, serialize(header));
}

/**
* Use with care, typically {@link #toSend(String, RequestHeader)} should be used instead.
*/
public ByteBuffer serialize(RequestHeader header) {
return serialize(header.toStruct(), toStruct());
}

protected abstract Struct toStruct();

/**
* Get an error response for a request
*/
Expand All @@ -76,54 +88,78 @@ public short version() {
/**
* Factory method for getting a request object based on ApiKey ID and a buffer
*/
public static AbstractRequest getRequest(int requestId, short versionId, ByteBuffer buffer) {
public static RequestAndSize getRequest(int requestId, short version, ByteBuffer buffer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another option is to add the request size as a field in ProduceRequest, since that appears to be the only usage.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that it that way first, but it's a bit weird because we don't always need the size and we can't compute it unless the Struct constructor is used. This way is reasonably clean and the cost seems low. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another alternative is to add the size to AbstractRequest perhaps?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that works because we often don't have (and don't need) a size. We had discussed renaming RequestAndSize to RequestMetadata and use that class to keep the request version when we remove the builders. However, the request version is already in the header field, so it seems like we don't need it in this other class. So, I left it as RequestAndSize for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. We can always rename the class if we need to add some more info in there.

ApiKeys apiKey = ApiKeys.forId(requestId);
Struct struct = ProtoUtils.parseRequest(apiKey.id, version, buffer);
AbstractRequest request;
switch (apiKey) {
case PRODUCE:
return ProduceRequest.parse(buffer, versionId);
request = new ProduceRequest(struct, version);
break;
case FETCH:
return FetchRequest.parse(buffer, versionId);
request = new FetchRequest(struct, version);
break;
case LIST_OFFSETS:
return ListOffsetRequest.parse(buffer, versionId);
request = new ListOffsetRequest(struct, version);
break;
case METADATA:
return MetadataRequest.parse(buffer, versionId);
request = new MetadataRequest(struct, version);
break;
case OFFSET_COMMIT:
return OffsetCommitRequest.parse(buffer, versionId);
request = new OffsetCommitRequest(struct, version);
break;
case OFFSET_FETCH:
return OffsetFetchRequest.parse(buffer, versionId);
request = new OffsetFetchRequest(struct, version);
break;
case GROUP_COORDINATOR:
return GroupCoordinatorRequest.parse(buffer, versionId);
request = new GroupCoordinatorRequest(struct, version);
break;
case JOIN_GROUP:
return JoinGroupRequest.parse(buffer, versionId);
request = new JoinGroupRequest(struct, version);
break;
case HEARTBEAT:
return HeartbeatRequest.parse(buffer, versionId);
request = new HeartbeatRequest(struct, version);
break;
case LEAVE_GROUP:
return LeaveGroupRequest.parse(buffer, versionId);
request = new LeaveGroupRequest(struct, version);
break;
case SYNC_GROUP:
return SyncGroupRequest.parse(buffer, versionId);
request = new SyncGroupRequest(struct, version);
break;
case STOP_REPLICA:
return StopReplicaRequest.parse(buffer, versionId);
request = new StopReplicaRequest(struct, version);
break;
case CONTROLLED_SHUTDOWN_KEY:
return ControlledShutdownRequest.parse(buffer, versionId);
request = new ControlledShutdownRequest(struct, version);
break;
case UPDATE_METADATA_KEY:
return UpdateMetadataRequest.parse(buffer, versionId);
request = new UpdateMetadataRequest(struct, version);
break;
case LEADER_AND_ISR:
return LeaderAndIsrRequest.parse(buffer, versionId);
request = new LeaderAndIsrRequest(struct, version);
break;
case DESCRIBE_GROUPS:
return DescribeGroupsRequest.parse(buffer, versionId);
request = new DescribeGroupsRequest(struct, version);
break;
case LIST_GROUPS:
return ListGroupsRequest.parse(buffer, versionId);
request = new ListGroupsRequest(struct, version);
break;
case SASL_HANDSHAKE:
return SaslHandshakeRequest.parse(buffer, versionId);
request = new SaslHandshakeRequest(struct, version);
break;
case API_VERSIONS:
return ApiVersionsRequest.parse(buffer, versionId);
request = new ApiVersionsRequest(struct, version);
break;
case CREATE_TOPICS:
return CreateTopicsRequest.parse(buffer, versionId);
request = new CreateTopicsRequest(struct, version);
break;
case DELETE_TOPICS:
return DeleteTopicsRequest.parse(buffer, versionId);
request = new DeleteTopicsRequest(struct, version);
break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
}
return new RequestAndSize(request, struct.sizeOf());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,56 +17,13 @@
import java.nio.ByteBuffer;

public abstract class AbstractRequestResponse {
protected final Struct struct;

public AbstractRequestResponse(Struct struct) {
this.struct = struct;
}

public Struct toStruct() {
return struct;
}

/**
* Get the serialized size of this object
*/
public int sizeOf() {
return struct.sizeOf();
}

/**
* Write this object to a buffer
* Visible for testing.
*/
public void writeTo(ByteBuffer buffer) {
struct.writeTo(buffer);
}

@Override
public String toString() {
return struct.toString();
}

@Override
public int hashCode() {
return struct.hashCode();
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
AbstractRequestResponse other = (AbstractRequestResponse) obj;
return struct.equals(other.struct);
}

public static ByteBuffer serialize(AbstractRequestResponse header, AbstractRequestResponse body) {
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + body.sizeOf());
header.writeTo(buffer);
body.writeTo(buffer);
public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) {
ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + bodyStruct.sizeOf());
headerStruct.writeTo(buffer);
bodyStruct.writeTo(buffer);
buffer.rewind();
return buffer;
}
Expand Down
Loading