From dc98cbe57bc1459dea1e9ebe218185a0284dc52f Mon Sep 17 00:00:00 2001 From: Justine Olshan Date: Tue, 15 Jun 2021 06:09:46 -0700 Subject: [PATCH 1/2] KAFKA-12701: NPE in MetadataRequest when using topic IDs (#10584) We prevent handling MetadataRequests where the topic name is null (to prevent NPE) as well as prevent requests that set topic IDs since this functionality has not yet been implemented. When we do implement it in https://github.com/apache/kafka/pull/9769, we should bump the request/response version. Added tests to ensure the error is thrown. Reviewers: dengziming , Ismael Juma --- .../common/requests/MetadataRequest.java | 21 ++++++++-- .../common/message/MetadataRequest.json | 3 +- .../common/requests/MetadataRequestTest.java | 25 ++++++++++++ .../main/scala/kafka/server/KafkaApis.scala | 11 ++++++ .../unit/kafka/server/KafkaApisTest.scala | 38 +++++++++++++++++++ 5 files changed, 94 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java index 816f600061568..d38e9ac47b80a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic; @@ -92,6 +93,16 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); + if (data.topics() != null) { + data.topics().forEach(topic -> { + if (topic.name() == null) + throw new UnsupportedVersionException("MetadataRequest version " + version + + " does not support null topic names."); + if (topic.topicId() != Uuid.ZERO_UUID) + throw new UnsupportedVersionException("MetadataRequest version " + version + + " does not support non-zero topic IDs."); + }); + } return new MetadataRequest(data, version); } @@ -117,13 +128,17 @@ public MetadataRequestData data() { public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Errors error = Errors.forException(e); MetadataResponseData responseData = new MetadataResponseData(); - if (topics() != null) { - for (String topic : topics()) + if (data.topics() != null) { + for (MetadataRequestTopic topic : data.topics()) { + // the response does not allow null, so convert to empty string if necessary + String topicName = topic.name() == null ? "" : topic.name(); responseData.topics().add(new MetadataResponseData.MetadataResponseTopic() - .setName(topic) + .setName(topicName) + .setTopicId(topic.topicId()) .setErrorCode(error.code()) .setIsInternal(false) .setPartitions(Collections.emptyList())); + } } responseData.setThrottleTimeMs(throttleTimeMs); diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index e5083a84ea343..a1634b19970d7 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -33,7 +33,8 @@ // // Version 9 is the first flexible version. // - // Version 10 adds topicId. + // Version 10 adds topicId and allows name field to be null. However, this functionality was not implemented on the server. + // Versions 10 and 11 should not use the topicId field or set topic name to null. // // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is now exposed // by the DescribeCluster API (KIP-700). diff --git a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java index e51523297597d..74c217df91f86 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/MetadataRequestTest.java @@ -16,16 +16,21 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.message.MetadataRequestData; import org.apache.kafka.common.protocol.ApiKeys; import org.junit.jupiter.api.Test; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; public class MetadataRequestTest { @@ -65,4 +70,24 @@ public void testMetadataRequestVersion() { assertEquals(minVersion, builder3.oldestAllowedVersion()); assertEquals(maxVersion, builder3.latestAllowedVersion()); } + + @Test + public void testTopicIdAndNullTopicNameRequests() { + // Construct invalid MetadataRequestTopics. We will build each one separately and ensure the error is thrown. + List topics = Arrays.asList( + new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName(null), + new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName("topic").setTopicId(Uuid.randomUuid())); + + // if version is 10 or 11, the invalid topic metadata should return an error + List invalidVersions = Arrays.asList((short) 10, (short) 11); + invalidVersions.forEach(version -> + topics.forEach(topic -> { + MetadataRequestData metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)); + MetadataRequest.Builder builder = new MetadataRequest.Builder(metadataRequestData); + assertThrows(UnsupportedVersionException.class, () -> builder.build(version)); + }) + ); + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a9812fbeb35b8..e3b4947264255 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -1137,6 +1137,17 @@ class KafkaApis(val requestChannel: RequestChannel, val metadataRequest = request.body[MetadataRequest] val requestVersion = request.header.apiVersion + // Topic IDs are not supported for versions 10 and 11. Topic names can not be null in these versions. + if (!metadataRequest.isAllTopics) { + metadataRequest.data.topics.forEach{ topic => + if (topic.name == null) { + throw new InvalidRequestException(s"Topic name can not be null for version ${metadataRequest.version}") + } else if (topic.topicId != Uuid.ZERO_UUID) { + throw new InvalidRequestException(s"Topic IDs are not supported in requests for version ${metadataRequest.version}") + } + } + } + val topics = if (metadataRequest.isAllTopics) metadataCache.getAllTopics() else diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index b89b29c30de39..82b76c769ff44 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1066,6 +1066,44 @@ class KafkaApisTest { numBrokersNeeded - 1) } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { + // Construct invalid MetadataRequestTopics. We will try each one separately and ensure the error is thrown. + val topics = List(new MetadataRequestData.MetadataRequestTopic().setName(null).setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName(null), + new MetadataRequestData.MetadataRequestTopic().setTopicId(Uuid.randomUuid()), + new MetadataRequestData.MetadataRequestTopic().setName("topic1").setTopicId(Uuid.randomUuid())) + + EasyMock.replay(replicaManager, clientRequestQuotaManager, + autoTopicCreationManager, forwardingManager, clientControllerQuotaManager, groupCoordinator, txnCoordinator) + + // if version is 10 or 11, the invalid topic metadata should return an error + val invalidVersions = Set(10, 11) + invalidVersions.foreach( version => + topics.foreach(topic => { + val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)) + val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort)) + val kafkaApis = createKafkaApis() + + val capturedResponse = EasyMock.newCapture[AbstractResponse]() + EasyMock.expect(requestChannel.sendResponse( + EasyMock.eq(request), + EasyMock.capture(capturedResponse), + EasyMock.anyObject() + )) + + EasyMock.replay(requestChannel) + kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) + + val response = capturedResponse.getValue.asInstanceOf[MetadataResponse] + assertEquals(1, response.topicMetadata.size) + assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST)) + response.data.topics.forEach(topic => assertNotEquals(null, topic.name)) + reset(requestChannel) + }) + ) + } + @Test def testOffsetCommitWithInvalidPartition(): Unit = { val topic = "topic" From 1cb6b898c0f2786afed6d221818f89d24cda534f Mon Sep 17 00:00:00 2001 From: Justine Date: Tue, 15 Jun 2021 09:51:14 -0700 Subject: [PATCH 2/2] Fix KafkaApisTest --- .../scala/unit/kafka/server/KafkaApisTest.scala | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 82b76c769ff44..f059b425e2658 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1082,20 +1082,17 @@ class KafkaApisTest { invalidVersions.foreach( version => topics.foreach(topic => { val metadataRequestData = new MetadataRequestData().setTopics(Collections.singletonList(topic)) - val request = buildRequest(new MetadataRequest(metadataRequestData, version.toShort)) + val metadataRequest = new MetadataRequest(metadataRequestData, version.toShort) + val request = buildRequest(metadataRequest) val kafkaApis = createKafkaApis() - val capturedResponse = EasyMock.newCapture[AbstractResponse]() - EasyMock.expect(requestChannel.sendResponse( - EasyMock.eq(request), - EasyMock.capture(capturedResponse), - EasyMock.anyObject() - )) + val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) EasyMock.replay(requestChannel) - kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching) + kafkaApis.handle(request) - val response = capturedResponse.getValue.asInstanceOf[MetadataResponse] + val response = readResponse(metadataRequest, capturedResponse).asInstanceOf[MetadataResponse] assertEquals(1, response.topicMetadata.size) assertEquals(1, response.errorCounts.get(Errors.INVALID_REQUEST)) response.data.topics.forEach(topic => assertNotEquals(null, topic.name))