diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 99cdbf9f57988..3644f9ce079e3 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -31,7 +31,6 @@ import static org.apache.kafka.common.protocol.types.Type.INT16; import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.INT64; -import static org.apache.kafka.common.protocol.types.Type.INT8; import static org.apache.kafka.common.protocol.types.Type.STRING; import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING; @@ -671,8 +670,7 @@ public class Protocol { public static final Schema STOP_REPLICA_REQUEST_V0 = new Schema(new Field("controller_id", INT32, "The controller id."), new Field("controller_epoch", INT32, "The controller epoch."), - new Field("delete_partitions", - INT8, + new Field("delete_partitions", BOOLEAN, "Boolean which indicates if replica's partitions must be deleted."), new Field("partitions", new ArrayOf(STOP_REPLICA_REQUEST_PARTITION_V0))); @@ -1008,4 +1006,4 @@ public static void main(String[] args) { System.out.println(toHtml()); } -} \ No newline at end of file +} diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index 80223a17bc196..bc63521bd8274 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -49,7 +49,7 @@ public StopReplicaRequest(int controllerId, int controllerEpoch, boolean deleteP struct.set(CONTROLLER_ID_KEY_NAME, controllerId); struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); - struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions ? (byte) 1 : (byte) 0); + struct.set(DELETE_PARTITIONS_KEY_NAME, deletePartitions); List partitionDatas = new ArrayList<>(partitions.size()); for (TopicPartition partition : partitions) { @@ -80,7 +80,7 @@ public StopReplicaRequest(Struct struct) { controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME); controllerEpoch = struct.getInt(CONTROLLER_EPOCH_KEY_NAME); - deletePartitions = ((byte) struct.get(DELETE_PARTITIONS_KEY_NAME)) != 0; + deletePartitions = struct.getBoolean(DELETE_PARTITIONS_KEY_NAME); } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 345de3f957ae0..043582d44b922 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -83,8 +83,9 @@ public void testSerialization() throws Exception { createProduceRequest(), createProduceRequest().getErrorResponse(2, new UnknownServerException()), createProduceResponse(), - createStopReplicaRequest(), - createStopReplicaRequest().getErrorResponse(0, new UnknownServerException()), + createStopReplicaRequest(true), + createStopReplicaRequest(false), + createStopReplicaRequest(true).getErrorResponse(0, new UnknownServerException()), createStopReplicaResponse(), createUpdateMetadataRequest(2, "rack1"), createUpdateMetadataRequest(2, null), @@ -348,9 +349,9 @@ private AbstractRequestResponse createProduceResponse() { return new ProduceResponse(responseData, 0); } - private AbstractRequest createStopReplicaRequest() { + private AbstractRequest createStopReplicaRequest(boolean deletePartitions) { Set partitions = new HashSet<>(Arrays.asList(new TopicPartition("test", 0))); - return new StopReplicaRequest(0, 1, true, partitions); + return new StopReplicaRequest(0, 1, deletePartitions, partitions); } private AbstractRequestResponse createStopReplicaResponse() { @@ -450,4 +451,4 @@ private AbstractRequestResponse createApiVersionResponse() { List apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2)); return new ApiVersionsResponse(Errors.NONE.code(), apiVersions); } -} \ No newline at end of file +}