From 2b7ad8b57188dcd3f2a48d0ce9960b06a5c03ca9 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Thu, 26 May 2016 14:18:12 +0100 Subject: [PATCH 01/14] KAFKA-3396 : Unauthorized topics are returned to the user Modified KafkaApis to return Errors.UNKNOWN_TOPIC_OR_PARTITION if principal has no Describe access to topic Unit tests expanded Some paths cause the client to block due to bug https://issues.apache.org/jira/browse/KAFKA-3727?filter=-2 tests work around this by executing in separate thread --- .../main/scala/kafka/server/KafkaApis.scala | 36 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 83 +++++++-- .../kafka/api/EndToEndAuthorizationTest.scala | 161 ++++++++++++++++-- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- 4 files changed, 240 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 85c47e6f745d3..aa606a2583ef0 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -237,12 +237,18 @@ class KafkaApis(val requestChannel: RequestChannel, val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) } - val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { + val filteredRequestInfo1 = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys + + //filter topics with Describe ACL as if they were non-existent + val (filteredRequestInfo2, unauthorizedForDescribeRequestInfo) = filteredRequestInfo1.partition { + case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + } + + val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo2.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } - + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) @@ -253,7 +259,8 @@ class KafkaApis(val requestChannel: RequestChannel, s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") } } - val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) @@ -807,22 +814,29 @@ class KafkaApis(val requestChannel: RequestChannel, var (authorizedTopics, unauthorizedTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) + var uncreatableTopics = Set[String]() + if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { authorizer.foreach { az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics - unauthorizedTopics ++= nonExistingTopics + uncreatableTopics ++= nonExistingTopics } } } } - val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, Topic.isInternal(topic), + val uncreatableTopicMetadata = uncreatableTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + // do not disclose the existence of the unauthorized topic + val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + java.util.Collections.emptyList())) + // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 @@ -832,7 +846,7 @@ class KafkaApis(val requestChannel: RequestChannel, else getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val completeTopicMetadata = topicMetadata ++ unauthorizedTopicMetadata + val completeTopicMetadata = topicMetadata ++ uncreatableTopicMetadata ++ unauthorizedTopicMetadata val brokers = metadataCache.getAliveBrokers @@ -869,8 +883,6 @@ class KafkaApis(val requestChannel: RequestChannel, val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code) - val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code) if (header.apiVersion == 0) { @@ -895,7 +907,7 @@ class KafkaApis(val requestChannel: RequestChannel, Errors.forException(e).code)) } }.toMap - new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava) + new OffsetFetchResponse((responseInfo).asJava) } else { // version 1 reads offsets from Kafka; val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap @@ -903,7 +915,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Note that we do not need to filter the partitions in the // metadata cache as the topic partitions will be filtered // in coordinator's offset manager through the offset cache - new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava) + new OffsetFetchResponse((offsets).asJava) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6d3b098ebc9ae..aa8538280f053 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -24,6 +24,7 @@ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ @@ -37,6 +38,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer + + class AuthorizerIntegrationTest extends BaseRequestTest { override def numBrokers: Int = 1 @@ -282,8 +285,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { sendRecords(numRecords, tp) fail("sendRecords should have thrown") } catch { - case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) + case e: org.apache.kafka.common.errors.TimeoutException => //expected } } @@ -386,7 +388,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: TestTimeoutException => //expected + } finally { + this.consumers.foreach(_.wakeup()) } } @@ -466,7 +470,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[KafkaException]) def testCommitWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.commitSync(Map(tp -> new OffsetAndMetadata(5)).asJava) @@ -512,11 +516,35 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test(expected = classOf[TopicAuthorizationException]) + @Test(expected = classOf[TestTimeoutException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) - this.consumers.head.position(tp) + + try { + var caughtExc : Exception = null + var positionSucceeded = false + val t = new java.lang.Thread() { + override def run () { + try { + AuthorizerIntegrationTest.this.consumers.head.position(tp) // consumer is stuck in org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(TopicPartition, long) + positionSucceeded = true + } catch { + case e : Exception => caughtExc = e + } + } + } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (!positionSucceeded) + throw new TestTimeoutException("Failed to position in 10000 millis.") + } finally { + this.consumers.foreach(_.wakeup()) + } } @Test @@ -537,10 +565,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithNoTopicAccess() { - val e = intercept[TopicAuthorizationException] { - this.consumers.head.partitionsFor(topic) - } - assertEquals(Set(topic), e.unauthorizedTopics().asScala) + val partitionInfos = this.consumers.head.partitionsFor(topic); + assertNull(partitionInfos) } @Test @@ -635,21 +661,42 @@ class AuthorizerIntegrationTest extends BaseRequestTest { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) + + var caughtExc : Exception = null + + val t = new java.lang.Thread() { + override def run () { + println("in consumeRecords.Thread.run") + try { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) + } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + } catch { + case e : Exception => caughtExc = e + } } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (records.isEmpty()) + throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 8edb6f85aa89f..14c99888d035a 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -107,6 +107,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { s"--topic=$topic", s"--producer", s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def describeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--add", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteDescribeAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Describe", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") + def deleteWriteAclArgs: Array[String] = Array("--authorizer-properties", + s"zookeeper.connect=$zkConnect", + s"--remove", + s"--force", + s"--topic=$topic", + s"--operation=Write", + s"--allow-principal=$kafkaPrincipalType:$clientPrincipal") def consumeAclArgs: Array[String] = Array("--authorizer-properties", s"zookeeper.connect=$zkConnect", s"--add", @@ -135,6 +155,8 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") + //to have testNoProduceAclWithoutDescribeAcl terminate quicker + this.producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000") /** * Starts MiniKDC and only then sets up the parent trait. @@ -203,23 +225,68 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { * isn't set. */ @Test - def testNoProduceAcl { + def testNoProduceAclWithoutDescribeAcl { //Produce records debug("Starting to send records") try{ sendRecords(numRecords, tp) - fail("Topic authorization exception expected") + fail("exception expected") } catch { - case e: TopicAuthorizationException => //expected + case e: org.apache.kafka.common.errors.TimeoutException => //expected } } - /** + @Test + def testNoProduceAclWithDescribeAcl { + AclCommand.main(describeAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) + }) + //Produce records + debug("Starting to send records") + try{ + sendRecords(numRecords, tp) + fail("exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + /** * Tests that a consumer fails to consume messages without the appropriate * ACL set. */ @Test - def testNoConsumeAcl { + def testNoConsumeWithoutDescribeAclViaAssign { + noConsumeWithoutDescribeAclProlog + consumers.head.assign(List(tp).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: TestTimeoutException => //expected from consumeRecords() + } finally { + //to avoid a ConcurrentModificationException in tearDown() + //as the consumer is stuck in poll() + consumers.foreach(_.wakeup()) + } + } + + @Test + def testNoConsumeWithoutDescribeAclViaSubscribe { + noConsumeWithoutDescribeAclProlog + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("exception expected") + } catch { + case e: TestTimeoutException => //expected from consumeRecords() + } + } + + private def noConsumeWithoutDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach(s => { @@ -229,9 +296,26 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { //Produce records debug("Starting to send records") sendRecords(numRecords, tp) - //Consume records + + //Deleting topic ACL without asking for console confirmation + AclCommand.main(deleteDescribeAclArgs) + AclCommand.main(deleteWriteAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + }) + debug("Finished sending and starting to consume records") + } + + /** + * Tests that a consumer fails to consume messages without the appropriate + * ACL set. + */ + @Test + def testNoConsumeAclWithDescribeAclViaAssign { + noConsumeAclWithDescribeAclProlog consumers.head.assign(List(tp).asJava) + try { consumeRecords(this.consumers.head) fail("Topic authorization exception expected") @@ -239,6 +323,33 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { case e: TopicAuthorizationException => //expected } } + + @Test + def testNoConsumeAclWithDescribeAclViaSubscribe { + noConsumeAclWithDescribeAclProlog + consumers.head.subscribe(List(topic).asJava) + + try { + consumeRecords(this.consumers.head) + fail("Topic authorization exception expected") + } catch { + case e: TopicAuthorizationException => //expected + } + } + + private def noConsumeAclWithDescribeAclProlog { + AclCommand.main(produceAclArgs) + AclCommand.main(groupAclArgs) + servers.foreach(s => { + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) + }) + //Produce records + debug("Starting to send records") + sendRecords(numRecords, tp) + //Consume records + debug("Finished sending and starting to consume records") + } /** * Tests that a consumer fails to consume messages without the appropriate @@ -284,15 +395,35 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) + + var caughtExc : Exception = null + + val t = new java.lang.Thread() { + override def run () { + try { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) + } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 + } + } catch { + case e : Exception => caughtExc = e + } } - if (iters > maxIters) - throw new IllegalStateException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 } + t.start() + t.join(10000L) + + if (caughtExc != null) + throw caughtExc + + if (records.isEmpty()) + throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + for (i <- 0 until numRecords) { val record = records.get(i) val offset = startingOffset + i @@ -302,3 +433,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } } } + +class TestTimeoutException (msg:String) + extends java.lang.RuntimeException (msg) {} + diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 3796e48880d45..8bd3c41386994 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,9 +455,11 @@ object TestUtils extends Logging { props: Option[Properties] = None): KafkaProducer[K, V] = { val producerProps = props.getOrElse(new Properties) + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) + if(!producerProps.containsKey(ProducerConfig.MAX_BLOCK_MS_CONFIG)) + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) From 9ebc96bffb397586b1a3ba8a7677c0c441483500 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Tue, 26 Jul 2016 19:20:06 +0100 Subject: [PATCH 02/14] KAFKA-3396: Updates and cleanups following the feedback --- .../main/scala/kafka/server/KafkaApis.scala | 52 +++++------ .../kafka/api/AuthorizerIntegrationTest.scala | 90 ++++++------------- .../kafka/api/EndToEndAuthorizationTest.scala | 87 ++++++++---------- .../scala/unit/kafka/utils/TestUtils.scala | 3 - 4 files changed, 91 insertions(+), 141 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index aa606a2583ef0..4acb49f13ecf8 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -233,35 +233,32 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - // filter non-existent topics - val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) => - !metadataCache.contains(topicPartition.topic) + + val (nonExistingRequestsInfo, existingTopicsRequestsInfo) = offsetCommitRequest.offsetData.asScala.toMap.partition { + case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) } - val filteredRequestInfo1 = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - - //filter topics with Describe ACL as if they were non-existent - val (filteredRequestInfo2, unauthorizedForDescribeRequestInfo) = filteredRequestInfo1.partition { + // treat topics with Describe ACL as if they were non-existent + val (filteredRequestInfo, unauthorizedForDescribeRequestInfo) = existingTopicsRequestsInfo.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo2.partition { + + val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } - + // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) + var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ nonExistingRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) + combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) - mergedCommitStatus.foreach { case (topicPartition, errorCode) => + combinedCommitStatus.foreach { case (topicPartition, errorCode) => if (errorCode != Errors.NONE.code) { debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") } } - var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) - val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) @@ -811,10 +808,10 @@ class KafkaApis(val requestChannel: RequestChannel, metadataRequest.topics.asScala.toSet } - var (authorizedTopics, unauthorizedTopics) = + var (authorizedTopics, unauthorizedForDescribeTopics) = topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) - var uncreatableTopics = Set[String]() + var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) @@ -822,21 +819,25 @@ class KafkaApis(val requestChannel: RequestChannel, authorizer.foreach { az => if (!az.authorize(request.session, Create, Resource.ClusterResource)) { authorizedTopics --= nonExistingTopics - uncreatableTopics ++= nonExistingTopics + unauthorizedForCreateTopics ++= nonExistingTopics } } } } - val uncreatableTopicMetadata = uncreatableTopics.map(topic => + val uncreatableTopicMetadata = unauthorizedForCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) - // do not disclose the existence of the unauthorized topic - val unauthorizedTopicMetadata = unauthorizedTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), - java.util.Collections.emptyList())) - + // do not disclose the existence of unauthorized topics + val unauthorizedTopicMetadata = + // In case of all topics, don't include unauthorized topics + if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics)) + Set.empty[MetadataResponse.TopicMetadata] + else + unauthorizedForDescribeTopics.map(topic => + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list val errorUnavailableEndpoints = requestVersion == 0 @@ -884,6 +885,7 @@ class KafkaApis(val requestChannel: RequestChannel, authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unknownTopicPartitionResponse)).toMap if (header.apiVersion == 0) { // version 0 reads offsets from ZK @@ -907,7 +909,7 @@ class KafkaApis(val requestChannel: RequestChannel, Errors.forException(e).code)) } }.toMap - new OffsetFetchResponse((responseInfo).asJava) + new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava) } else { // version 1 reads offsets from Kafka; val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap @@ -915,7 +917,7 @@ class KafkaApis(val requestChannel: RequestChannel, // Note that we do not need to filter the partitions in the // metadata cache as the topic partitions will be filtered // in coordinator's offset manager through the offset cache - new OffsetFetchResponse((offsets).asJava) + new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index aa8538280f053..66bb46a01dcc2 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -37,6 +37,12 @@ import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import org.apache.kafka.common.KafkaException @@ -283,9 +289,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testProduceWithNoTopicAccess() { try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { - case e: org.apache.kafka.common.errors.TimeoutException => //expected + case e: TimeoutException => //expected } } @@ -294,7 +300,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -306,7 +312,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) try { sendRecords(numRecords, tp) - fail("sendRecords should have thrown") + fail("should have thrown exception") } catch { case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) @@ -377,7 +383,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testConsumeWithNoTopicAccess() { + def testConsumeWithoutTopicDescribeAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) sendRecords(1, tp) removeAllAcls() @@ -388,9 +394,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TestTimeoutException => //expected - } finally { - this.consumers.foreach(_.wakeup()) + case e: KafkaException => //expected } } @@ -516,35 +520,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { this.consumers.head.position(tp) } - @Test(expected = classOf[TestTimeoutException]) + @Test(expected = classOf[KafkaException]) def testOffsetFetchWithNoTopicAccess() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.assign(List(tp).asJava) - - try { - var caughtExc : Exception = null - var positionSucceeded = false - val t = new java.lang.Thread() { - override def run () { - try { - AuthorizerIntegrationTest.this.consumers.head.position(tp) // consumer is stuck in org.apache.kafka.clients.consumer.internals.Fetcher.listOffset(TopicPartition, long) - positionSucceeded = true - } catch { - case e : Exception => caughtExc = e - } - } - } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (!positionSucceeded) - throw new TestTimeoutException("Failed to position in 10000 millis.") - } finally { - this.consumers.foreach(_.wakeup()) - } + this.consumers.head.position(tp) } @Test @@ -619,7 +599,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val possibleErrorCodes = resources.flatMap { resourceType => if(resourceType == Topic) // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names - Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code()) + Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) else Seq(resourceType.errorCode) } @@ -661,35 +641,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest { part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - - var caughtExc : Exception = null - - val t = new java.lang.Thread() { - override def run () { - println("in consumeRecords.Thread.run") - try { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - } catch { - case e : Exception => caughtExc = e + + val future = Future { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 } + records } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (records.isEmpty()) - throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + val result = Await.result(future, 10 seconds) for (i <- 0 until numRecords) { val record = records.get(i) @@ -697,6 +662,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } + } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 14c99888d035a..8567e75b021b8 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -30,13 +30,17 @@ import kafka.utils._ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig} import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig} import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.{TopicPartition} +import org.apache.kafka.common.{TopicPartition,KafkaException} import org.apache.kafka.common.protocol.SecurityProtocol -import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException} +import org.apache.kafka.common.errors.{GroupAuthorizationException,TopicAuthorizationException,TimeoutException} import org.junit.Assert._ import org.junit.{Test, After, Before} import scala.collection.JavaConverters._ +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.util.{Failure, Success} /** @@ -171,9 +175,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } super.setUp AclCommand.main(topicBrokerReadAclArgs) - servers.foreach( s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, new Resource(Topic, "*")) - ) + } // create the test topic with all the brokers as replicas TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } @@ -209,10 +213,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { private def setAclsAndProduce() { AclCommand.main(produceAclArgs) AclCommand.main(consumeAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -232,16 +236,16 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { sendRecords(numRecords, tp) fail("exception expected") } catch { - case e: org.apache.kafka.common.errors.TimeoutException => //expected + case e: TimeoutException => //expected } } @Test def testNoProduceAclWithDescribeAcl { AclCommand.main(describeAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) - }) + } //Produce records debug("Starting to send records") try{ @@ -265,11 +269,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { consumeRecords(this.consumers.head) fail("exception expected") } catch { - case e: TestTimeoutException => //expected from consumeRecords() - } finally { - //to avoid a ConcurrentModificationException in tearDown() - //as the consumer is stuck in poll() - consumers.foreach(_.wakeup()) + case e: KafkaException => //expected } } @@ -282,27 +282,27 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { consumeRecords(this.consumers.head) fail("exception expected") } catch { - case e: TestTimeoutException => //expected from consumeRecords() + case e: TestTimeoutException => //expected } } private def noConsumeWithoutDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) - //Deleting topic ACL without asking for console confirmation + //Deleting topic ACL AclCommand.main(deleteDescribeAclArgs) AclCommand.main(deleteWriteAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) - }) + } debug("Finished sending and starting to consume records") } @@ -340,10 +340,10 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { private def noConsumeAclWithDescribeAclProlog { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) - servers.foreach(s => { + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) TestUtils.waitAndVerifyAcls(GroupReadAcl, servers.head.apis.authorizer.get, groupResource) - }) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -358,9 +358,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { @Test def testNoGroupAcl { AclCommand.main(produceAclArgs) - servers.foreach(s => + servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl, s.apis.authorizer.get, topicResource) - ) + } //Produce records debug("Starting to send records") sendRecords(numRecords, tp) @@ -396,33 +396,19 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val maxIters = numRecords * 50 - var caughtExc : Exception = null - - val t = new java.lang.Thread() { - override def run () { - try { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { - records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } - } catch { - case e : Exception => caughtExc = e + val future = Future { + var iters = 0 + while (records.size < numRecords) { + for (record <- consumer.poll(50).asScala) { + records.add(record) } + if (iters > maxIters) + throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") + iters += 1 } + records } - t.start() - t.join(10000L) - - if (caughtExc != null) - throw caughtExc - - if (records.isEmpty()) - throw new TestTimeoutException("Failed to consume the expected records after 10000 millis.") + val result = Await.result(future, 10 seconds) for (i <- 0 until numRecords) { val record = records.get(i) @@ -430,10 +416,9 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { assertEquals(topic, record.topic()) assertEquals(part, record.partition()) assertEquals(offset.toLong, record.offset()) - } + } } } -class TestTimeoutException (msg:String) - extends java.lang.RuntimeException (msg) {} +class TestTimeoutException(msg:String) extends java.lang.RuntimeException (msg) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 8bd3c41386994..af5fffd0a49cc 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -455,11 +455,8 @@ object TestUtils extends Logging { props: Option[Properties] = None): KafkaProducer[K, V] = { val producerProps = props.getOrElse(new Properties) - producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) - if(!producerProps.containsKey(ProducerConfig.MAX_BLOCK_MS_CONFIG)) - producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) From d47bac1ed000a5f76f24b4a03304f0ff1e105f81 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 17 Aug 2016 13:34:19 +0100 Subject: [PATCH 03/14] KAFKA-3396: More updates + rebased off trunk - Added tests from 44ad3ec - Small refactorings --- .../main/scala/kafka/server/KafkaApis.scala | 35 +++---- .../kafka/api/AuthorizerIntegrationTest.scala | 93 +++++++++++++++++-- 2 files changed, 100 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 4acb49f13ecf8..ebf3b2f30dcbf 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -234,24 +234,19 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - val (nonExistingRequestsInfo, existingTopicsRequestsInfo) = offsetCommitRequest.offsetData.asScala.toMap.partition { - case (topicPartition, _) => !metadataCache.contains(topicPartition.topic) + val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { + case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) } - // treat topics with Describe ACL as if they were non-existent - val (filteredRequestInfo, unauthorizedForDescribeRequestInfo) = existingTopicsRequestsInfo.partition { - case (topicPartition, offsetMetadata) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) - } - - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { - case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) + val (authorizedTopics, unauthorizedForReadTopics) = existingOrAuthorizedForDescribeTopics.partition { + case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { - val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code) - var combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ nonExistingRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - combinedCommitStatus = combinedCommitStatus ++ unauthorizedForDescribeRequestInfo.mapValues(_ => Errors.UNKNOWN_TOPIC_OR_PARTITION.code).mapValues(new JShort(_)) + var combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ + unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) combinedCommitStatus.foreach { case (topicPartition, errorCode) => if (errorCode != Errors.NONE.code) { @@ -264,11 +259,11 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } - if (authorizedRequestInfo.isEmpty) + if (authorizedTopics.isEmpty) sendResponseCallback(Map.empty) else if (header.apiVersion == 0) { // for version 0 always store offsets to ZK - val responseInfo = authorizedRequestInfo.map { + val responseInfo = authorizedTopics.map { case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { @@ -305,7 +300,7 @@ class KafkaApis(val requestChannel: RequestChannel, // - If v2 we use the default expiration timestamp val currentTimestamp = SystemTime.milliseconds val defaultExpireTimestamp = offsetRetention + currentTimestamp - val partitionData = authorizedRequestInfo.mapValues { partitionData => + val partitionData = authorizedTopics.mapValues { partitionData => val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata new OffsetAndMetadata( offsetMetadata = OffsetMetadata(partitionData.offset, metadata), @@ -779,7 +774,7 @@ class KafkaApis(val requestChannel: RequestChannel, } else if (config.autoCreateTopicsEnable) { createTopic(topic, config.numPartitions, config.defaultReplicationFactor) } else { - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList()) } } @@ -825,12 +820,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val uncreatableTopicMetadata = unauthorizedForCreateTopics.map(topic => + val unauthorizedForCreateTopicMetadata = unauthorizedForCreateTopics.map(topic => new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) // do not disclose the existence of unauthorized topics - val unauthorizedTopicMetadata = + val unauthorizedForDescribeTopicMetadata = // In case of all topics, don't include unauthorized topics if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics)) Set.empty[MetadataResponse.TopicMetadata] @@ -847,7 +842,7 @@ class KafkaApis(val requestChannel: RequestChannel, else getTopicMetadata(authorizedTopics, request.securityProtocol, errorUnavailableEndpoints) - val completeTopicMetadata = topicMetadata ++ uncreatableTopicMetadata ++ unauthorizedTopicMetadata + val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata val brokers = metadataCache.getAliveBrokers @@ -1210,7 +1205,7 @@ class KafkaApis(val requestChannel: RequestChannel, }.toMap sendResponseCallback(results) } else { - // If no authorized topics return immediatly + // If no authorized topics return immediately if (authorizedTopics.isEmpty) sendResponseCallback(Map()) else { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 66bb46a01dcc2..0b2c2d09efbb3 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -15,6 +15,7 @@ package kafka.api import java.nio.ByteBuffer import java.util import java.util.concurrent.ExecutionException +import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} import kafka.common @@ -22,9 +23,9 @@ import kafka.common.TopicAndPartition import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, KafkaConsumer, OffsetAndMetadata} +import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener +import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.KafkaException import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ @@ -44,14 +45,13 @@ import scala.util.{Failure, Success} import org.apache.kafka.common.KafkaException - - class AuthorizerIntegrationTest extends BaseRequestTest { override def numBrokers: Int = 1 val brokerId: Integer = 0 val topic = "topic" + val topicPattern = "topic.*" val createTopic = "topic-new" val deleteTopic = "topic-delete" val part = 0 @@ -411,7 +411,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) Assert.fail("should have thrown exception") } catch { - case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + case e: TopicAuthorizationException => assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -429,7 +429,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(topic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(topic), e.unauthorizedTopics()) } } @@ -445,6 +445,83 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumeRecords(this.consumers.head) } + @Test + def testPatternSubscriptionWithNoTopicAccess() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + this.consumers.head.poll(50) + assertTrue(this.consumers.head.subscription().isEmpty()) + } + + @Test + def testPatternSubscriptionWithTopicAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + + //create a unmatched topic + val unmatchedTopic = "unmatched" + TestUtils.createTopic(zkUtils, unmatchedTopic, 1, 1, this.servers) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), new Resource(Topic, unmatchedTopic)) + sendRecords(1, new TopicPartition(unmatchedTopic, part)) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val consumer = consumers.head + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + + // set the subscription pattern to an internal topic that the consumer has no read permission for, but since + // `exclude.internal.topics` is true by default, the subscription should be empty and no authorization exception + // should be thrown + consumer.subscribe(Pattern.compile(kafka.common.Topic.GroupMetadataTopicName), new NoOpConsumerRebalanceListener) + assertTrue(consumer.poll(50).isEmpty) + } + + @Test + def testPatternSubscriptionMatchingInternalTopicWithNoPermission() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + } catch { + case e: TestTimeoutException => //expected + } finally consumer.close() + } + + @Test + def testPatternSubscriptionNotMatchingInternalTopic() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + } finally consumer.close() +} + @Test def testCreatePermissionNeededToReadFromNonExistentTopic() { val newTopic = "newTopic" @@ -459,7 +536,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { Assert.fail("should have thrown exception") } catch { case e: TopicAuthorizationException => - assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()); + assertEquals(Collections.singleton(newTopic), e.unauthorizedTopics()) } addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), newTopicResource) @@ -545,7 +622,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testListOffsetsWithNoTopicAccess() { - val partitionInfos = this.consumers.head.partitionsFor(topic); + val partitionInfos = this.consumers.head.partitionsFor(topic) assertNull(partitionInfos) } From f69e0aa4e725cb8f3e9619b8ccf24c73185006ee Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Wed, 21 Sep 2016 10:34:02 +0100 Subject: [PATCH 04/14] KAFKA-3396 : Unauthorized topics are returned to the user Rebased after kip-79 changes. Fixing leak of topic for LIST_OFFSETS when unauthorized. Added tests. --- .../main/scala/kafka/server/KafkaApis.scala | 6 ++-- .../kafka/api/AuthorizerIntegrationTest.scala | 36 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index ebf3b2f30dcbf..267cb8c0bca8d 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -553,7 +553,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => - new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, List[JLong]().asJava) + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, List[JLong]().asJava) ) val responseMap = authorizedRequestInfo.map {case (topicPartition, partitionData) => @@ -604,7 +604,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val unauthorizedResponseStatus = unauthorizedRequestInfo.mapValues(_ => { - new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, + new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET) }) @@ -831,7 +831,7 @@ class KafkaApis(val requestChannel: RequestChannel, Set.empty[MetadataResponse.TopicMetadata] else unauthorizedForDescribeTopics.map(topic => - new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) + new MetadataResponse.TopicMetadata(Errors.UNKNOWN_TOPIC_OR_PARTITION, topic, false, java.util.Collections.emptyList())) // In version 0, we returned an error when brokers with replicas were unavailable, // while in higher versions we simply don't include the broker in the returned broker list diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 0b2c2d09efbb3..70801e4c0e641 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -44,6 +44,8 @@ import scala.concurrent.duration._ import scala.util.{Failure, Success} import org.apache.kafka.common.KafkaException +import java.util.HashMap +import kafka.admin.AdminUtils class AuthorizerIntegrationTest extends BaseRequestTest { @@ -58,6 +60,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val correlationId = 0 val clientId = "client-Id" val tp = new TopicPartition(topic, part) + val topicAndPartition = new TopicAndPartition(topic, part) val group = "my-group" val topicResource = new Resource(Topic, topic) @@ -285,6 +288,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + /* + * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name + */ + @Test + def testAuthorizationProduceFetchDoNotLeakTopicName() { + AdminUtils.deleteTopic(zkUtils, topic) + TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) + + val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( + ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.FETCH -> createFetchRequest + ) + + for ((key, request) <- requestKeyToRequest) { + removeAllAcls + val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false) + } + } + @Test def testProduceWithNoTopicAccess() { try { @@ -627,7 +650,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } @Test - def testListOfsetsWithTopicDescribe() { + def testListOffsetsWithTopicDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) this.consumers.head.partitionsFor(topic) } @@ -674,9 +697,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response) val possibleErrorCodes = resources.flatMap { resourceType => - if(resourceType == Topic) - // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION to prevent leaking topic names - Seq(resourceType.errorCode, Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + if (resourceType == Topic) + if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) { + //Only allowing TOPIC_AUTHORIZATION_FAILED as an error code + Seq(resourceType.errorCode) + } else { + // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION/UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names + Seq(Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + } else Seq(resourceType.errorCode) } From 0ba2d79631debe603cd1fe1d0d131bd713bf5d85 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Fri, 23 Sep 2016 16:34:48 +0100 Subject: [PATCH 05/14] KAFKA-3396 : Unauthorized topics are returned to the user cleanup after review --- .../main/scala/kafka/server/KafkaApis.scala | 19 +++++----- .../kafka/api/AuthorizerIntegrationTest.scala | 24 ++++++------- .../kafka/api/EndToEndAuthorizationTest.scala | 36 +++++++------------ .../scala/unit/kafka/utils/TestUtils.scala | 1 + 4 files changed, 32 insertions(+), 48 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 267cb8c0bca8d..35106d1b9de83 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -233,9 +233,8 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { - case (topicPartition, _) => metadataCache.contains(topicPartition.topic) && authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) } val (authorizedTopics, unauthorizedForReadTopics) = existingOrAuthorizedForDescribeTopics.partition { @@ -807,15 +806,13 @@ class KafkaApis(val requestChannel: RequestChannel, topics.partition(topic => authorize(request.session, Describe, new Resource(auth.Topic, topic))) var unauthorizedForCreateTopics = Set[String]() - + if (authorizedTopics.nonEmpty) { val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) if (config.autoCreateTopicsEnable && nonExistingTopics.nonEmpty) { - authorizer.foreach { az => - if (!az.authorize(request.session, Create, Resource.ClusterResource)) { - authorizedTopics --= nonExistingTopics - unauthorizedForCreateTopics ++= nonExistingTopics - } + if (!authorize(request.session, Create, Resource.ClusterResource)) { + authorizedTopics --= nonExistingTopics + unauthorizedForCreateTopics ++= nonExistingTopics } } } @@ -824,10 +821,10 @@ class KafkaApis(val requestChannel: RequestChannel, new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) - // do not disclose the existence of unauthorized topics + // do not disclose the existence of topics unauthorized for Describe val unauthorizedForDescribeTopicMetadata = - // In case of all topics, don't include unauthorized topics - if ((requestVersion == 0 && (metadataRequest.topics() == null || metadataRequest.topics().isEmpty)) || (metadataRequest.isAllTopics)) + // In case of all topics, don't include topics unauthorized for Describe + if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) Set.empty[MetadataResponse.TopicMetadata] else unauthorizedForDescribeTopics.map(topic => diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 70801e4c0e641..e2be31049984f 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -14,7 +14,7 @@ package kafka.api import java.nio.ByteBuffer import java.util -import java.util.concurrent.ExecutionException +import java.util.concurrent.{TimeoutException ⇒ JTimeoutException, ExecutionException} import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} @@ -175,6 +175,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @After override def tearDown() = { producers.foreach(_.close()) + consumers.foreach(_.wakeup()) consumers.foreach(_.close()) removeAllAcls super.tearDown() @@ -522,7 +523,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) consumeRecords(consumer) } catch { - case e: TestTimeoutException => //expected + case e: JTimeoutException => //expected } finally consumer.close() } @@ -700,10 +701,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { if (resourceType == Topic) if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) { //Only allowing TOPIC_AUTHORIZATION_FAILED as an error code - Seq(resourceType.errorCode) + Seq(Errors.TOPIC_AUTHORIZATION_FAILED.code) } else { // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION/UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names - Seq(Errors.INVALID_TOPIC_EXCEPTION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + Seq(Errors.INVALID_TOPIC_EXCEPTION.code, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) } else Seq(resourceType.errorCode) @@ -745,21 +746,16 @@ class AuthorizerIntegrationTest extends BaseRequestTest { topic: String = topic, part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 50 val future = Future { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { + TestUtils.waitUntilTrue(() => { + for (record <- consumer.poll(50).asScala) records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } + records.size >= numRecords + }, "Failed to consume %d records".format(numRecords), 10000L) records } - val result = Await.result(future, 10 seconds) + val result = Await.result(future, 15 seconds) for (i <- 0 until numRecords) { val record = records.get(i) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 8567e75b021b8..ee66338a3edeb 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -19,7 +19,7 @@ package kafka.api import java.io.File import java.util.ArrayList -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, TimeoutException => JTimeoutException} import kafka.admin.AclCommand import kafka.common.TopicAndPartition @@ -159,8 +159,6 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") this.serverConfig.setProperty(KafkaConfig.MinInSyncReplicasProp, "3") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group") - //to have testNoProduceAclWithoutDescribeAcl terminate quicker - this.producerConfig.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000") /** * Starts MiniKDC and only then sets up the parent trait. @@ -187,6 +185,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { */ @After override def tearDown { + consumers.foreach(_.wakeup()) super.tearDown closeSasl() } @@ -229,7 +228,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { * isn't set. */ @Test - def testNoProduceAclWithoutDescribeAcl { + def testNoProduceWithoutDescribeAcl { //Produce records debug("Starting to send records") try{ @@ -241,7 +240,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } @Test - def testNoProduceAclWithDescribeAcl { + def testNoProduceWithDescribeAcl { AclCommand.main(describeAclArgs) servers.foreach { s => TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) @@ -282,7 +281,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { consumeRecords(this.consumers.head) fail("exception expected") } catch { - case e: TestTimeoutException => //expected + case e: JTimeoutException => //expected } } @@ -312,8 +311,8 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { * ACL set. */ @Test - def testNoConsumeAclWithDescribeAclViaAssign { - noConsumeAclWithDescribeAclProlog + def testNoConsumeWithDescribeAclViaAssign { + noConsumeWithDescribeAclSetup consumers.head.assign(List(tp).asJava) try { @@ -325,8 +324,8 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } @Test - def testNoConsumeAclWithDescribeAclViaSubscribe { - noConsumeAclWithDescribeAclProlog + def testNoConsumeWithDescribeAclViaSubscribe { + noConsumeWithDescribeAclSetup consumers.head.subscribe(List(topic).asJava) try { @@ -337,7 +336,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } } - private def noConsumeAclWithDescribeAclProlog { + private def noConsumeWithDescribeAclSetup { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach { s => @@ -394,18 +393,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { topic: String = topic, part: Int = part) { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() - val maxIters = numRecords * 50 - + val future = Future { - var iters = 0 - while (records.size < numRecords) { - for (record <- consumer.poll(50).asScala) { + while (records.size < numRecords) + for (record <- consumer.poll(50).asScala) records.add(record) - } - if (iters > maxIters) - throw new TestTimeoutException("Failed to consume the expected records after " + iters + " iterations.") - iters += 1 - } records } val result = Await.result(future, 10 seconds) @@ -420,5 +412,3 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } } -class TestTimeoutException(msg:String) extends java.lang.RuntimeException (msg) - diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index af5fffd0a49cc..3796e48880d45 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -457,6 +457,7 @@ object TestUtils extends Logging { val producerProps = props.getOrElse(new Properties) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, acks.toString) + producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, maxBlockMs.toString) producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferSize.toString) producerProps.put(ProducerConfig.RETRIES_CONFIG, retries.toString) producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) From 6c7e51f0992f6027b5e15a8b280580c1b8f838a1 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Mon, 26 Sep 2016 11:58:07 +0100 Subject: [PATCH 06/14] KAFKA-3396 : Unauthorized topics are returned to the user Cleanup addressing comments --- .../main/scala/kafka/server/KafkaApis.scala | 14 +++++---- .../kafka/api/EndToEndAuthorizationTest.scala | 11 ++++++- .../kafka/api/IntegrationTestHarness.scala | 30 ++++++++++++------- 3 files changed, 39 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 35106d1b9de83..68414f522bfd4 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -233,11 +233,11 @@ class KafkaApis(val requestChannel: RequestChannel, val responseBody = new OffsetCommitResponse(results.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } else { - val (existingOrAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = offsetCommitRequest.offsetData.asScala.toMap.partition { case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) } - val (authorizedTopics, unauthorizedForReadTopics) = existingOrAuthorizedForDescribeTopics.partition { + val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition { case (topicPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicPartition.topic)) } @@ -249,8 +249,12 @@ class KafkaApis(val requestChannel: RequestChannel, combinedCommitStatus.foreach { case (topicPartition, errorCode) => if (errorCode != Errors.NONE.code) { - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") + if (errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code && metadataCache.contains(topicPartition.topic)) + debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failed due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") + else + debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") } } val responseHeader = new ResponseHeader(header.correlationId) @@ -821,7 +825,7 @@ class KafkaApis(val requestChannel: RequestChannel, new MetadataResponse.TopicMetadata(Errors.TOPIC_AUTHORIZATION_FAILED, topic, common.Topic.isInternal(topic), java.util.Collections.emptyList())) - // do not disclose the existence of topics unauthorized for Describe + // do not disclose the existence of topics unauthorized for Describe, so we've not even checked if they exist or not val unauthorizedForDescribeTopicMetadata = // In case of all topics, don't include topics unauthorized for Describe if ((requestVersion == 0 && (metadataRequest.topics == null || metadataRequest.topics.isEmpty)) || metadataRequest.isAllTopics) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index ee66338a3edeb..e207535c9d215 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -28,7 +28,7 @@ import kafka.server._ import kafka.utils._ import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecord, ConsumerConfig} -import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig} +import org.apache.kafka.clients.producer.{ProducerRecord, ProducerConfig, KafkaProducer} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{TopicPartition,KafkaException} import org.apache.kafka.common.protocol.SecurityProtocol @@ -180,6 +180,15 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } + override def createNewProducer() : KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createNewProducer(brokerList, + maxBlockMs = 5000L, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(producerConfig)) + } + /** * Closes MiniKDC last when tearing down. */ diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 9595ad6c23b66..6b76bef66ffea 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -64,17 +64,9 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) consumerConfig.putAll(consumerSecurityProps) for (i <- 0 until producerCount) - producers += TestUtils.createNewProducer(brokerList, - securityProtocol = this.securityProtocol, - trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, - props = Some(producerConfig)) + producers += createNewProducer for (i <- 0 until consumerCount) { - consumers += TestUtils.createNewConsumer(brokerList, - securityProtocol = this.securityProtocol, - trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, - props = Some(consumerConfig)) + consumers += createNewConsumer } // create the consumer offset topic @@ -85,6 +77,24 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { servers.head.groupCoordinator.offsetsTopicConfigs) } + //extracted method to allow for different params in some specific tests + def createNewProducer() : KafkaProducer[Array[Byte], Array[Byte]] = { + TestUtils.createNewProducer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(producerConfig)) + } + + //extracted method to allow for different params in some specific tests + def createNewConsumer() : KafkaConsumer[Array[Byte], Array[Byte]] = { + TestUtils.createNewConsumer(brokerList, + securityProtocol = this.securityProtocol, + trustStoreFile = this.trustStoreFile, + saslProperties = this.saslProperties, + props = Some(consumerConfig)) + } + @After override def tearDown() { producers.foreach(_.close()) From 8d4f7f766541b0360fb90fd4c4b9c682319f95ef Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Mon, 26 Sep 2016 12:14:38 +0100 Subject: [PATCH 07/14] KAFKA-3396 : Unauthorized topics are returned to the user Cleanup addressing comments --- .../integration/kafka/api/AuthorizerIntegrationTest.scala | 8 +++----- .../integration/kafka/api/EndToEndAuthorizationTest.scala | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index e2be31049984f..2f8d8e0d8ab82 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -748,14 +748,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]() val future = Future { - TestUtils.waitUntilTrue(() => { - for (record <- consumer.poll(50).asScala) + while (records.size < numRecords) + for (record <- consumer.poll(50).asScala) records.add(record) - records.size >= numRecords - }, "Failed to consume %d records".format(numRecords), 10000L) records } - val result = Await.result(future, 15 seconds) + val result = Await.result(future, 10 seconds) for (i <- 0 until numRecords) { val record = records.get(i) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index e207535c9d215..f7a0924748c37 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -405,7 +405,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { val future = Future { while (records.size < numRecords) - for (record <- consumer.poll(50).asScala) + for (record <- consumer.poll(50).asScala) records.add(record) records } From 7a8393c2611a9ffda29477cca9205f51693eeb3d Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Mon, 26 Sep 2016 19:10:53 +0100 Subject: [PATCH 08/14] KAFKA-3396 : Unauthorized topics are returned to the user Revised handling of FETCH PRODUCE and DELETE requests --- .../security/auth/SimpleAclAuthorizer.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 54 ++++---- .../scala/kafka/server/MetadataCache.scala | 6 - .../kafka/api/AuthorizerIntegrationTest.scala | 117 ++++++++++++------ .../server/DeleteTopicsRequestTest.scala | 4 +- 5 files changed, 117 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index a36a07d0006c0..42bfebf61ab03 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -127,9 +127,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging { //check if there is any Deny acl match that would disallow this operation. val denyMatch = aclMatch(session, operation, resource, principal, host, Deny, acls) - //if principal is allowed to read or write we allow describe by default, the reverse does not apply to Deny. + //if principal is allowed to read, write or delete we allow describe by default, the reverse does not apply to Deny. val ops = if (Describe == operation) - Set[Operation](operation, Read, Write) + Set[Operation](operation, Read, Write, Delete) else Set[Operation](operation) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 68414f522bfd4..02cd1184c226b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -243,7 +243,7 @@ class KafkaApis(val requestChannel: RequestChannel, // the callback for sending an offset commit response def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) { - var combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ + val combinedCommitStatus = commitStatus.mapValues(new JShort(_)) ++ unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) @@ -270,9 +270,7 @@ class KafkaApis(val requestChannel: RequestChannel, case (topicPartition, partitionData) => val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic) try { - if (!metadataCache.hasTopicMetadata(topicPartition.topic)) - (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) + if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize) (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code) else { zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString) @@ -338,15 +336,22 @@ class KafkaApis(val requestChannel: RequestChannel, val produceRequest = request.body.asInstanceOf[ProduceRequest] val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf - val (authorizedRequestInfo, unauthorizedRequestInfo) = produceRequest.partitionRecords.asScala.partition { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = produceRequest.partitionRecords.asScala.partition { + case (topicPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) + } + + val (authorizedRequestInfo, unauthorizedForWriteRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { case (topicPartition, _) => authorize(request.session, Write, new Resource(auth.Topic, topicPartition.topic)) } // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicPartition, PartitionResponse]) { - val mergedResponseStatus = responseStatus ++ unauthorizedRequestInfo.mapValues(_ => - new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) + val mergedResponseStatus = responseStatus ++ + unauthorizedForWriteRequestInfo.mapValues(_ => + new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, Message.NoTimestamp)) ++ + nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => + new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, Message.NoTimestamp)) var errorInResponse = false @@ -434,11 +439,18 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition { + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = fetchRequest.requestInfo.partition { + case (topicAndPartition, _) => authorize(request.session, Describe, new Resource(auth.Topic, topicAndPartition.topic)) && metadataCache.contains(topicAndPartition.topic) + } + + val (authorizedRequestInfo, unauthorizedForReadRequestInfo) = existingAndAuthorizedForDescribeTopics.partition { case (topicAndPartition, _) => authorize(request.session, Read, new Resource(auth.Topic, topicAndPartition.topic)) } - val unauthorizedPartitionData = unauthorizedRequestInfo.map { case (tp, _) => + val nonExistingOrUnauthorizedForDescribePartitionData = nonExistingOrUnauthorizedForDescribeTopics.map { case (tp, _) => + (tp, FetchResponsePartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, -1, MessageSet.Empty)) + } + val unauthorizedForReadPartitionData = unauthorizedForReadRequestInfo.map { case (tp, _) => (tp, FetchResponsePartitionData(Errors.TOPIC_AUTHORIZATION_FAILED.code, -1, MessageSet.Empty)) } @@ -468,7 +480,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } else responsePartitionData - val mergedPartitionData = convertedPartitionData ++ unauthorizedPartitionData + val mergedPartitionData = convertedPartitionData ++ unauthorizedForReadPartitionData ++ nonExistingOrUnauthorizedForDescribePartitionData mergedPartitionData.foreach { case (topicAndPartition, data) => if (data.error != Errors.NONE.code) @@ -888,7 +900,7 @@ class KafkaApis(val requestChannel: RequestChannel, val responseInfo = authorizedTopicPartitions.map { topicPartition => val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic) try { - if (!metadataCache.hasTopicMetadata(topicPartition.topic)) + if (!metadataCache.contains(topicPartition.topic)) (topicPartition, unknownTopicPartitionResponse) else { val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1 @@ -1179,21 +1191,17 @@ class KafkaApis(val requestChannel: RequestChannel, def handleDeleteTopicsRequest(request: RequestChannel.Request) { val deleteTopicRequest = request.body.asInstanceOf[DeleteTopicsRequest] - val (authorizedTopics, unauthorizedTopics) = deleteTopicRequest.topics.asScala.partition( topic => - authorize(request.session, Delete, new Resource(auth.Topic, topic)) + val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics) = deleteTopicRequest.topics.asScala.partition( topic => + authorize(request.session, Describe, new Resource(auth.Topic, topic)) && metadataCache.contains(topic) ) - val unauthorizedResults = unauthorizedTopics.map ( topic => - // Avoid leaking that the topic exists if the user is not authorized to describe the topic - if (authorize(request.session, Describe, new Resource(auth.Topic, topic))) { - (topic, Errors.TOPIC_AUTHORIZATION_FAILED) - } else { - (topic, Errors.INVALID_TOPIC_EXCEPTION) - } - ).toMap - + val (authorizedTopics, unauthorizedForDeleteTopics) = existingAndAuthorizedForDescribeTopics.partition( topic => + authorize(request.session, Delete, new Resource(auth.Topic, topic)) + ) + def sendResponseCallback(results: Map[String, Errors]): Unit = { - val completeResults = results ++ unauthorizedResults + val completeResults = nonExistingOrUnauthorizedForDescribeTopics.map( topic => (topic, Errors.UNKNOWN_TOPIC_OR_PARTITION)).toMap ++ + unauthorizedForDeleteTopics.map( topic => (topic, Errors.TOPIC_AUTHORIZATION_FAILED)).toMap ++ results val respHeader = new ResponseHeader(request.header.correlationId) val responseBody = new DeleteTopicsResponse(completeResults.asJava) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index f493e7d96d02b..feef6ae7a0292 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -120,12 +120,6 @@ private[server] class MetadataCache(brokerId: Int) extends Logging { } } - def hasTopicMetadata(topic: String): Boolean = { - inReadLock(partitionMetadataLock) { - cache.contains(topic) - } - } - def getAllTopics(): Set[String] = { inReadLock(partitionMetadataLock) { cache.keySet.toSet diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 2f8d8e0d8ab82..a3d8a5e5c535a 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -260,22 +260,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testAuthorization() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( - ApiKeys.METADATA -> createMetadataRequest, - ApiKeys.PRODUCE -> createProduceRequest, - ApiKeys.FETCH -> createFetchRequest, - ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, - ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, - ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest, - ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, - ApiKeys.JOIN_GROUP -> createJoinGroupRequest, - ApiKeys.SYNC_GROUP -> createSyncGroupRequest, - ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, - ApiKeys.HEARTBEAT -> createHeartbeatRequest, - ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest, - ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest, - ApiKeys.STOP_REPLICA -> createStopReplicaRequest, - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, - ApiKeys.CREATE_TOPICS -> createTopicsRequest, +// ApiKeys.METADATA -> createMetadataRequest, +// ApiKeys.PRODUCE -> createProduceRequest, +// ApiKeys.FETCH -> createFetchRequest, +// ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, +// ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, +// ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest, +// ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, +// ApiKeys.JOIN_GROUP -> createJoinGroupRequest, +// ApiKeys.SYNC_GROUP -> createSyncGroupRequest, +// ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, +// ApiKeys.HEARTBEAT -> createHeartbeatRequest, +// ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest, +// ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest, +// ApiKeys.STOP_REPLICA -> createStopReplicaRequest, +// ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, +// ApiKeys.CREATE_TOPICS -> createTopicsRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) @@ -293,19 +293,25 @@ class AuthorizerIntegrationTest extends BaseRequestTest { * checking that whether the topic exists or not, when unauthorized, FETCH and PRODUCE do not leak the topic name */ @Test - def testAuthorizationProduceFetchDoNotLeakTopicName() { + def testAuthorizationWithTopicNotExisting() { AdminUtils.deleteTopic(zkUtils, topic) TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers) + AdminUtils.deleteTopic(zkUtils, deleteTopic) + TestUtils.verifyTopicDeletion(zkUtils, deleteTopic, 1, servers) val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( ApiKeys.PRODUCE -> createProduceRequest, - ApiKeys.FETCH -> createFetchRequest + ApiKeys.FETCH -> createFetchRequest, + ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) for ((key, request) <- requestKeyToRequest) { removeAllAcls val resources = RequestKeysToAcls(key).map(_._1.resourceType).toSet - sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false) + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = false, topicExists = false) + for ((resource, acls) <- RequestKeysToAcls(key)) + addAndVerifyAcls(acls, resource) + sendRequestAndVerifyResponseErrorCode(key, request, resources, isAuthorized = true, topicExists = false) } } @@ -478,7 +484,26 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) this.consumers.head.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) this.consumers.head.poll(50) - assertTrue(this.consumers.head.subscription().isEmpty()) + assertTrue(this.consumers.head.subscription.isEmpty) + } + + @Test + def testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val consumer = consumers.head + consumer.subscribe(Pattern.compile(topicPattern), new NoOpConsumerRebalanceListener) + try { + consumeRecords(consumer) + Assert.fail("Expected TopicAuthorizationException") + } catch { + case e: TopicAuthorizationException => //expected + } + } @Test @@ -522,8 +547,31 @@ class AuthorizerIntegrationTest extends BaseRequestTest { try { consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) consumeRecords(consumer) + assertEquals(Set[String](topic).asJava, consumer.subscription) + } finally consumer.close() + } + + @Test + def testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission() { + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource) + sendRecords(1, tp) + removeAllAcls() + + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), groupResource) + val internalTopicResource = new Resource(Topic, kafka.common.Topic.GroupMetadataTopicName) + addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), internalTopicResource) + + val consumerConfig = new Properties + consumerConfig.put(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG, "false") + val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = group, + securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(consumerConfig)) + try { + consumer.subscribe(Pattern.compile(".*"), new NoOpConsumerRebalanceListener) + consumeRecords(consumer) + Assert.fail("Expected TopicAuthorizationException") } catch { - case e: JTimeoutException => //expected + case e: TopicAuthorizationException => //expected } finally consumer.close() } @@ -661,7 +709,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = send(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val deleteResponse = DeleteTopicsResponse.parse(response) - assertEquals(Errors.INVALID_TOPIC_EXCEPTION, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2) } @Test @@ -692,29 +740,28 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def sendRequestAndVerifyResponseErrorCode(apiKey: ApiKeys, request: AbstractRequest, resources: Set[ResourceType], - isAuthorized: Boolean): AbstractRequestResponse = { + isAuthorized: Boolean, + topicExists: Boolean = true): AbstractRequestResponse = { val resp = send(request, apiKey) val response = RequestKeyToResponseDeserializer(apiKey).getMethod("parse", classOf[ByteBuffer]).invoke(null, resp).asInstanceOf[AbstractRequestResponse] val errorCode = RequestKeyToErrorCode(apiKey).asInstanceOf[(AbstractRequestResponse) => Short](response) val possibleErrorCodes = resources.flatMap { resourceType => if (resourceType == Topic) - if (apiKey == ApiKeys.PRODUCE || apiKey == ApiKeys.FETCH ) { - //Only allowing TOPIC_AUTHORIZATION_FAILED as an error code - Seq(Errors.TOPIC_AUTHORIZATION_FAILED.code) - } else { - // When completely unauthorized topic resources may return an INVALID_TOPIC_EXCEPTION/UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names - Seq(Errors.INVALID_TOPIC_EXCEPTION.code, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - } + // When completely unauthorized topic resources must return an UNKNOWN_TOPIC_OR_PARTITION to prevent leaking topic names + Seq(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) else - Seq(resourceType.errorCode) + Seq(resourceType.errorCode) } - if (isAuthorized) - assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode)) + if (topicExists) + if (isAuthorized) + assertFalse(s"${apiKey} should be allowed. Found error code $errorCode", possibleErrorCodes.contains(errorCode)) + else + assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode)) else - assertTrue(s"${apiKey} should be forbidden. Found error code $errorCode but expected one of ${possibleErrorCodes.mkString(",")} ", possibleErrorCodes.contains(errorCode)) - + assertEquals(s"${apiKey} - Found error code $errorCode", Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), errorCode) + response } diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index a59316b575b9f..e04e1b792ce91 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -58,7 +58,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { // Basic validateErrorDeleteTopicRequests(new DeleteTopicsRequest(Set("invalid-topic").asJava, timeout), - Map("invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION)) + Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION)) // Partial TestUtils.createTopic(zkUtils, "partial-topic-1", 1, 1, servers) @@ -67,7 +67,7 @@ class DeleteTopicsRequestTest extends BaseRequestTest { "partial-invalid-topic").asJava, timeout), Map( "partial-topic-1" -> Errors.NONE, - "partial-invalid-topic" -> Errors.INVALID_TOPIC_EXCEPTION + "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION ) ) From 94b5f5c84e05ed702455321a198ca808735afb70 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Mon, 26 Sep 2016 23:48:52 +0100 Subject: [PATCH 09/14] KAFKA-3396 : Unauthorized topics are returned to the user oops had commented out part os a test by mistake --- .../kafka/api/AuthorizerIntegrationTest.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index a3d8a5e5c535a..6e4511fbedb6c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -260,22 +260,22 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testAuthorization() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( -// ApiKeys.METADATA -> createMetadataRequest, -// ApiKeys.PRODUCE -> createProduceRequest, -// ApiKeys.FETCH -> createFetchRequest, -// ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, -// ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, -// ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest, -// ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, -// ApiKeys.JOIN_GROUP -> createJoinGroupRequest, -// ApiKeys.SYNC_GROUP -> createSyncGroupRequest, -// ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, -// ApiKeys.HEARTBEAT -> createHeartbeatRequest, -// ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest, -// ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest, -// ApiKeys.STOP_REPLICA -> createStopReplicaRequest, -// ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, -// ApiKeys.CREATE_TOPICS -> createTopicsRequest, + ApiKeys.METADATA -> createMetadataRequest, + ApiKeys.PRODUCE -> createProduceRequest, + ApiKeys.FETCH -> createFetchRequest, + ApiKeys.LIST_OFFSETS -> createListOffsetsRequest, + ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest, + ApiKeys.GROUP_COORDINATOR -> createGroupCoordinatorRequest, + ApiKeys.UPDATE_METADATA_KEY -> createUpdateMetadataRequest, + ApiKeys.JOIN_GROUP -> createJoinGroupRequest, + ApiKeys.SYNC_GROUP -> createSyncGroupRequest, + ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, + ApiKeys.HEARTBEAT -> createHeartbeatRequest, + ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest, + ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest, + ApiKeys.STOP_REPLICA -> createStopReplicaRequest, + ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, + ApiKeys.CREATE_TOPICS -> createTopicsRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest ) From 6e51c47ede6ce2ff913d53daf676efd97aaa9c14 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 27 Sep 2016 13:49:49 +0100 Subject: [PATCH 10/14] KAFKA-3396 : Unauthorized topics are returned to the user addressing comments: code formatting, not in upgrade.html --- .../integration/kafka/api/EndToEndAuthorizationTest.scala | 2 +- .../scala/integration/kafka/api/IntegrationTestHarness.scala | 4 ++-- docs/upgrade.html | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index f7a0924748c37..9c36d3b6888d6 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -180,7 +180,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { TestUtils.createTopic(zkUtils, topic, 1, 3, this.servers) } - override def createNewProducer() : KafkaProducer[Array[Byte], Array[Byte]] = { + override def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = { TestUtils.createNewProducer(brokerList, maxBlockMs = 5000L, securityProtocol = this.securityProtocol, diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 6b76bef66ffea..0678fb15c17ad 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -78,7 +78,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { } //extracted method to allow for different params in some specific tests - def createNewProducer() : KafkaProducer[Array[Byte], Array[Byte]] = { + def createNewProducer: KafkaProducer[Array[Byte], Array[Byte]] = { TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, @@ -87,7 +87,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { } //extracted method to allow for different params in some specific tests - def createNewConsumer() : KafkaConsumer[Array[Byte], Array[Byte]] = { + def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = { TestUtils.createNewConsumer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, diff --git a/docs/upgrade.html b/docs/upgrade.html index 7b16ab0b6ecac..61174f0668f57 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -64,6 +64,8 @@
Notable changes in
  • Kafka clusters can now be uniquely identified by a cluster id. It will be automatically generated when a broker is upgraded to 0.10.1.0. The cluster id is available via the kafka.server:type=KafkaServer,name=ClusterId metric and it is part of the Metadata response. Serializers, client interceptors and metric reporters can receive the cluster id by implementing the ClusterResourceListener interface.
  • The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric.
  • The new Java Consumer now allows users to search offsets by timestamp on partitions.
  • +
  • When using an Authorizer and a user hasn't got Describe authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors + but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.
  • New Protocol Versions
    From 4bd02a1695be89b94a293abb341c7bcdfd10b3bd Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Tue, 27 Sep 2016 14:43:14 +0100 Subject: [PATCH 11/14] KAFKA-3396 : Unauthorized topics are returned to the user addressing comments: missing --- docs/upgrade.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 61174f0668f57..1b1c5937310e9 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -65,7 +65,7 @@
    Notable changes in
  • The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric.
  • The new Java Consumer now allows users to search offsets by timestamp on partitions.
  • When using an Authorizer and a user hasn't got Describe authorization on a topic, the broker will no longer return TOPIC_AUTHORIZATION_FAILED errors - but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.
  • + but just UNKNOWN_TOPIC_OR_PARTITION errors, to avoid leaking topic names.
    New Protocol Versions
    From 5ee7bd1e0773bb0ab8a265a8a707b0e72f249f27 Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Wed, 28 Sep 2016 16:38:48 +0100 Subject: [PATCH 12/14] KAFKA-3396 : Unauthorized topics are returned to the user addressing comments: handling UTOP in ConsumerCoordinator offset fetch/commit improved logging efficiency on the server --- .../internals/ConsumerCoordinator.java | 6 ++++++ .../main/scala/kafka/server/KafkaApis.scala | 20 +++++++++++-------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ff0d66991850a..27d6a752de5ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -670,6 +670,10 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu resetGeneration(); future.raise(new CommitFailedException()); return; + } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { + log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message()); + future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic: " + error.message())); + return; } else { log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message()); future.raise(new KafkaException("Unexpected error in commit: " + error.message())); @@ -731,6 +735,8 @@ public void handle(OffsetFetchResponse response, RequestFuture authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) && metadataCache.contains(topicPartition.topic) + case (topicPartition, _) => { + val authorizedForDescribe = authorize(request.session, Describe, new Resource(auth.Topic, topicPartition.topic)) + val exists = metadataCache.contains(topicPartition.topic) + if (!authorizedForDescribe && exists) + debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + + s"on partition $topicPartition failing due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") + authorizedForDescribe && exists + } } val (authorizedTopics, unauthorizedForReadTopics) = existingAndAuthorizedForDescribeTopics.partition { @@ -247,16 +254,13 @@ class KafkaApis(val requestChannel: RequestChannel, unauthorizedForReadTopics.mapValues(_ => new JShort(Errors.TOPIC_AUTHORIZATION_FAILED.code)) ++ nonExistingOrUnauthorizedForDescribeTopics.mapValues(_ => new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) - combinedCommitStatus.foreach { case (topicPartition, errorCode) => - if (errorCode != Errors.NONE.code) { - if (errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code && metadataCache.contains(topicPartition.topic)) - debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to user not having DESCRIBE authorization, but returning UNKNOWN_TOPIC_OR_PARTITION") - else + if (logger.isDebugEnabled()) //optimizing code as it's a loop + combinedCommitStatus.foreach { case (topicPartition, errorCode) => + if (errorCode != Errors.NONE.code) { debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " + s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}") + } } - } val responseHeader = new ResponseHeader(header.correlationId) val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) From 376a5a75443eb5c5ec65b631fac6b984e4fbb63a Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Thu, 29 Sep 2016 13:58:59 +0100 Subject: [PATCH 13/14] KAFKA-3396 : Unauthorized topics are returned to the user test cleanup and fixing intermittent failure --- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 7 +++---- .../kafka/api/IntegrationTestHarness.scala | 18 +++++++++++++++++- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 6e4511fbedb6c..be41581865537 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -14,7 +14,7 @@ package kafka.api import java.nio.ByteBuffer import java.util -import java.util.concurrent.{TimeoutException ⇒ JTimeoutException, ExecutionException} +import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 9c36d3b6888d6..2f5858cfc3eb1 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -42,7 +42,6 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.util.{Failure, Success} - /** * The test cases here verify that a producer authorized to publish to a topic * is able to, and that consumers in a group authorized to consume are able to @@ -270,7 +269,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { */ @Test def testNoConsumeWithoutDescribeAclViaAssign { - noConsumeWithoutDescribeAclProlog + noConsumeWithoutDescribeAclSetup consumers.head.assign(List(tp).asJava) try { @@ -283,7 +282,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { @Test def testNoConsumeWithoutDescribeAclViaSubscribe { - noConsumeWithoutDescribeAclProlog + noConsumeWithoutDescribeAclSetup consumers.head.subscribe(List(topic).asJava) try { @@ -294,7 +293,7 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup { } } - private def noConsumeWithoutDescribeAclProlog { + private def noConsumeWithoutDescribeAclSetup { AclCommand.main(produceAclArgs) AclCommand.main(groupAclArgs) servers.foreach { s => diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index 0678fb15c17ad..ffca4311b07a3 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -29,6 +29,8 @@ import kafka.integration.KafkaServerTestHarness import org.junit.{After, Before} import scala.collection.mutable.Buffer +import scala.util.control.Breaks.{breakable, break} +import java.util.ConcurrentModificationException /** * A helper class for writing integration tests that involve producers, consumers, and servers @@ -98,7 +100,21 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { @After override def tearDown() { producers.foreach(_.close()) - consumers.foreach(_.close()) + + consumers.foreach { consumer => + breakable { + while(true) { + try { + consumer.close + break + } catch { + //short wait to make sure that woken up consumer can be closed without spurious ConcurrentModificationException + case e: ConcurrentModificationException => Thread.sleep(100L) + } + } + } + } + super.tearDown() } From 52232c9fa15e32446bf4c953535d927a71ac3d6d Mon Sep 17 00:00:00 2001 From: Edoardo Comar Date: Fri, 30 Sep 2016 15:27:15 +0100 Subject: [PATCH 14/14] KAFKA-3396 : Unauthorized topics are returned to the user throwing UTOP also form AdminUtils.delete --- core/src/main/scala/kafka/admin/AdminUtils.scala | 4 ++-- core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 0273bdb0a58b0..7873028e19db4 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -26,7 +26,7 @@ import kafka.utils.ZkUtils._ import java.util.Random import java.util.Properties import org.apache.kafka.common.Node -import org.apache.kafka.common.errors.{ReplicaNotAvailableException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException} +import org.apache.kafka.common.errors.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, InvalidTopicException, LeaderNotAvailableException, InvalidPartitionsException, InvalidReplicationFactorException, TopicExistsException, InvalidReplicaAssignmentException} import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} import org.apache.kafka.common.requests.MetadataResponse @@ -325,7 +325,7 @@ object AdminUtils extends Logging with AdminUtilities { case e2: Throwable => throw new AdminOperationException(e2) } } else { - throw new InvalidTopicException("topic %s to delete does not exist".format(topic)) + throw new UnknownTopicOrPartitionException("topic %s to delete does not exist".format(topic)) } } diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala index ea5a21348d8b3..d39de75563a8b 100644 --- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala @@ -26,7 +26,7 @@ import org.junit.Test import java.util.Properties import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition} -import org.apache.kafka.common.errors.InvalidTopicException +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException class DeleteTopicTest extends ZooKeeperTestHarness { @@ -206,9 +206,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness { // start topic deletion try { AdminUtils.deleteTopic(zkUtils, "test2") - fail("Expected InvalidTopicException") + fail("Expected UnknownTopicOrPartitionException") } catch { - case e: InvalidTopicException => // expected exception + case e: UnknownTopicOrPartitionException => // expected exception } // verify delete topic path for test2 is removed from zookeeper TestUtils.verifyTopicDeletion(zkUtils, "test2", 1, servers)