Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsInvalidTopologyEpochException extends ApiException {
public StreamsInvalidTopologyEpochException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsInvalidTopologyException extends ApiException {
public StreamsInvalidTopologyException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

public class StreamsTopologyFencedException extends ApiException {
public StreamsTopologyFencedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public enum ApiKeys {
READ_SHARE_GROUP_STATE(ApiMessageType.READ_SHARE_GROUP_STATE, true),
WRITE_SHARE_GROUP_STATE(ApiMessageType.WRITE_SHARE_GROUP_STATE, true),
DELETE_SHARE_GROUP_STATE(ApiMessageType.DELETE_SHARE_GROUP_STATE, true),
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true);
READ_SHARE_GROUP_STATE_SUMMARY(ApiMessageType.READ_SHARE_GROUP_STATE_SUMMARY, true),
STREAMS_GROUP_HEARTBEAT(ApiMessageType.STREAMS_GROUP_HEARTBEAT);


private static final Map<ApiMessageType.ListenerType, EnumSet<ApiKeys>> APIS_BY_LISTENER =
new EnumMap<>(ApiMessageType.ListenerType.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyEpochException;
import org.apache.kafka.common.errors.StreamsInvalidTopologyException;
import org.apache.kafka.common.errors.StreamsTopologyFencedException;
import org.apache.kafka.common.errors.TelemetryTooLargeException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
Expand Down Expand Up @@ -413,7 +416,10 @@ public enum Errors {
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new),
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", VoterNotFoundException::new),
INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", InvalidRegularExpression::new),
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should rebootstrap to obtain new metadata.", RebootstrapRequiredException::new),
STREAMS_INVALID_TOPOLOGY(130, "The supplied topology is invalid.", StreamsInvalidTopologyException::new),
STREAMS_INVALID_TOPOLOGY_EPOCH(131, "The supplied topology epoch is invalid.", StreamsInvalidTopologyEpochException::new),
STREAMS_TOPOLOGY_FENCED(132, "The supplied topology epoch is outdated.", StreamsTopologyFencedException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
return DeleteShareGroupStateRequest.parse(buffer, apiVersion);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryRequest.parse(buffer, apiVersion);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
return DeleteShareGroupStateResponse.parse(responseBuffer, version);
case READ_SHARE_GROUP_STATE_SUMMARY:
return ReadShareGroupStateSummaryResponse.parse(responseBuffer, version);
case STREAMS_GROUP_HEARTBEAT:
return StreamsGroupHeartbeatResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupHeartbeatRequestData;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;

public class StreamsGroupHeartbeatRequest extends AbstractRequest {

/**
* A member epoch of <code>-1</code> means that the member wants to leave the group.
*/
public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
public static final int LEAVE_GROUP_STATIC_MEMBER_EPOCH = -2;

/**
* A member epoch of <code>0</code> means that the member wants to join the group.
*/
public static final int JOIN_GROUP_MEMBER_EPOCH = 0;

public static class Builder extends AbstractRequest.Builder<StreamsGroupHeartbeatRequest> {
private final StreamsGroupHeartbeatRequestData data;

public Builder(StreamsGroupHeartbeatRequestData data) {
this(data, false);
}

public Builder(StreamsGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}

@Override
public StreamsGroupHeartbeatRequest build(short version) {
return new StreamsGroupHeartbeatRequest(data, version);
}

@Override
public String toString() {
return data.toString();
}
}

private final StreamsGroupHeartbeatRequestData data;

public StreamsGroupHeartbeatRequest(StreamsGroupHeartbeatRequestData data, short version) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT, version);
this.data = data;
}

@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
return new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(Errors.forException(e).code())
);
}

@Override
public StreamsGroupHeartbeatRequestData data() {
return data;
}

public static StreamsGroupHeartbeatRequest parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatRequest(new StreamsGroupHeartbeatRequestData(
new ByteBufferAccessor(buffer), version), version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.requests;

import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;

/**
* Possible error codes.
*
* - {@link Errors#GROUP_AUTHORIZATION_FAILED}
* - {@link Errors#NOT_COORDINATOR}
* - {@link Errors#COORDINATOR_NOT_AVAILABLE}
* - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS}
* - {@link Errors#INVALID_REQUEST}
* - {@link Errors#UNKNOWN_MEMBER_ID}
* - {@link Errors#FENCED_MEMBER_EPOCH}
* - {@link Errors#UNRELEASED_INSTANCE_ID}
* - {@link Errors#GROUP_MAX_SIZE_REACHED}
* - {@link Errors#GROUP_ID_NOT_FOUND}
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED}
* - {@link Errors#CLUSTER_AUTHORIZATION_FAILED}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY}
* - {@link Errors#STREAMS_INVALID_TOPOLOGY_EPOCH}
* - {@link Errors#STREAMS_TOPOLOGY_FENCED}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the KIP, we also have the following errors that are not in this list:

  • GROUP_ID_NOT_FOUND
  • TOPIC_AUTHORIZATION_FAILED
  • CLUSTER_AUTHORIZATION_FAILED

In this list, the following errors are not in the KIP:

  • UNSUPPORTED_ASSIGNOR

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
public class StreamsGroupHeartbeatResponse extends AbstractResponse {
private final StreamsGroupHeartbeatResponseData data;

public StreamsGroupHeartbeatResponse(StreamsGroupHeartbeatResponseData data) {
super(ApiKeys.STREAMS_GROUP_HEARTBEAT);
this.data = data;
}

@Override
public StreamsGroupHeartbeatResponseData data() {
return data;
}

@Override
public Map<Errors, Integer> errorCounts() {
return Collections.singletonMap(Errors.forCode(data.errorCode()), 1);
}

@Override
public int throttleTimeMs() {
return data.throttleTimeMs();
}

@Override
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
data.setThrottleTimeMs(throttleTimeMs);
}

public static StreamsGroupHeartbeatResponse parse(ByteBuffer buffer, short version) {
return new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData(
new ByteBufferAccessor(buffer), version));
}

public enum Status {
STALE_TOPOLOGY((byte) 0, "The topology epoch supplied is inconsistent with the topology for this streams group."),
MISSING_SOURCE_TOPICS((byte) 1, "One or more source topics are missing or a source topic regex resolves to zero topics."),
INCORRECTLY_PARTITIONED_TOPICS((byte) 2, "One or more topics expected to be copartitioned are not copartitioned."),
MISSING_INTERNAL_TOPICS((byte) 3, "One or more internal topics are missing."),
SHUTDOWN_APPLICATION((byte) 4, "A client requested the shutdown of the whole application.");

private final byte code;
private final String message;

Status(final byte code, final String message) {
this.code = code;
this.message = message;
}

public byte code() {
return code;
}

public String message() {
return message;
}
}
}
Loading