Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public enum ApiKeys {
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT);
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT),
STREAMS_GROUP_DESCRIBE(ApiMessageType.STREAMS_GROUP_DESCRIBE);


private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponse.parse(responseBuffer, version);
case STREAMS_GROUP_DESCRIBE:
return StreamsGroupDescribeResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupDescribeRequestData;
import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.stream.Collectors;

public class StreamsGroupDescribeRequest extends AbstractRequest {

public static class Builder extends AbstractRequest.Builder<StreamsGroupDescribeRequest> {

private final StreamsGroupDescribeRequestData data;

public Builder(StreamsGroupDescribeRequestData data) {
this(data, false);
}

public Builder(StreamsGroupDescribeRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE, enableUnstableLastVersion);
this.data = data;
}

@Override
public StreamsGroupDescribeRequest build(short version) {
return new StreamsGroupDescribeRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final StreamsGroupDescribeRequestData data;

public StreamsGroupDescribeRequest(StreamsGroupDescribeRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE, version);
this.data = data;
}

@Override
public StreamsGroupDescribeResponse getErrorResponse(int throttleTimeMs, Throwable e) {
StreamsGroupDescribeResponseData data = new StreamsGroupDescribeResponseData()
.setThrottleTimeMs(throttleTimeMs);
// Set error for each group
this.data.groupIds().forEach(
groupId -> data.groups().add(
new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(Errors.forException(e).code())
)
);
return new StreamsGroupDescribeResponse(data);
}

@Override
public StreamsGroupDescribeRequestData data() {
return data;
}

public static StreamsGroupDescribeRequest parse(ByteBuffer buffer, short version) {
return new StreamsGroupDescribeRequest(
new StreamsGroupDescribeRequestData(new ByteBufferAccessor(buffer), version),
version
);
}

public static List<StreamsGroupDescribeResponseData.DescribedGroup> getErrorDescribedGroupList(
List<String> groupIds,
Errors error
) {
return groupIds.stream()
.map(groupId -> new StreamsGroupDescribeResponseData.DescribedGroup()
.setGroupId(groupId)
.setErrorCode(error.code())
).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#INVALID_GROUP_ID}
* - {@link Errors#GROUP_ID_NOT_FOUND}
*/
public class StreamsGroupDescribeResponse extends AbstractResponse {

private final StreamsGroupDescribeResponseData data;

public StreamsGroupDescribeResponse(StreamsGroupDescribeResponseData data) {
super(ApiKeys.STREAMS_GROUP_DESCRIBE);
this.data = data;
}

@Override
public StreamsGroupDescribeResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
HashMap<Errors, Integer> counts = new HashMap<>();
data.groups().forEach(
group -> updateErrorCounts(counts, Errors.forCode(group.errorCode()))
);
return counts;
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static StreamsGroupDescribeResponse parse(ByteBuffer buffer, short version) {
return new StreamsGroupDescribeResponse(
new StreamsGroupDescribeResponseData(new ByteBufferAccessor(buffer), version)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 89,
"type": "request",
"listeners": ["broker", "zkBroker"],
"name": "StreamsGroupDescribeRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "GroupIds", "type": "[]string", "versions": "0+", "entityType": "groupId",
"about": "The ids of the groups to describe" },
{ "name": "IncludeAuthorizedOperations", "type": "bool", "versions": "0+",
"about": "Whether to include authorized operations." }
]
}
Loading