From 6ee590bc8f65217227c8bda98644dce35ed0d701 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sun, 6 Mar 2016 20:04:45 -0800 Subject: [PATCH 1/7] KAFKA-2960: Clear purgatory for partition before becoming follower --- .../scala/kafka/server/DelayedProduce.scala | 5 +- .../scala/kafka/server/ReplicaManager.scala | 5 ++ .../ClearPurgatoryOnLeaderMovementTest.scala | 89 +++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index be1be4faf97c7..52ec238b255c1 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -89,7 +89,10 @@ class DelayedProduce(delayMs: Long, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - partition.checkEnoughReplicasReachOffset(status.requiredOffset) + if (partition.leaderReplicaIfLocal().isEmpty) + (false, Errors.NOT_LEADER_FOR_PARTITION.code) + else + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index e388d98046c15..be6713fc181d5 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -800,6 +800,11 @@ class ReplicaManager(val config: KafkaConfig, } logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) + partitionsToMakeFollower.foreach(partition => { + val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) + tryCompleteDelayedProduce(topicPartitionOperationKey) + tryCompleteDelayedFetch(topicPartitionOperationKey) + }) partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + diff --git a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala new file mode 100644 index 0000000000000..ab2ccd2b026c7 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.api + +import java.util.Properties +import java.util.concurrent.ExecutionException + +import kafka.admin.{PreferredReplicaLeaderElectionCommand, ReassignPartitionsCommand} +import kafka.common.TopicAndPartition +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.errors.NotLeaderForPartitionException +import org.junit.Assert.{assertFalse, assertTrue} +import org.junit.Test + +/** + * Test the case where produce requests in the purgatory should be cleared when the leader migrates to another broker. + */ +class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { + + val topic = "topic" + val partition = 0 + def generateConfigs = { + val overridingProps = new Properties() + val numServers = 2 + overridingProps.put(KafkaConfig.NumPartitionsProp, 1.toString) + // Make sure the fetch request will not return. + overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "100000") + overridingProps.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "100000") + overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, "100000") + overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, "100000") + TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), + trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, overridingProps)) + } + + @Test + def testCleanProducerPurgatoryOnBecomeFollower() { + TestUtils.createTopic(zkUtils, topic, 1, 2, servers, new Properties()) + TestUtils.waitUntilLeaderIsKnown(servers, topic, partition) + val producer = TestUtils.createNewProducer(brokerList) + + // fetch the metadata. + producer.partitionsFor(topic) + val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "key".getBytes, "value".getBytes)) + // Wait 10 millisends to make sure the produce request has been sent. It is ugly but we don't have a way to + // see if the producer has sent the produce request. + Thread.sleep(10) + assertFalse(future.isDone) + // Trigger a leader migration + if (TestUtils.isLeaderLocalOnBroker(topic, 0, servers(0))) + moveLeader(oldLeader = 0, newLeader = 1) + else + moveLeader(oldLeader = 1, newLeader = 0) + try { + future.get() + fail("Should throw ExecutionException") + } catch { + case e: ExecutionException => + assertTrue(e.getCause.isInstanceOf[NotLeaderForPartitionException]) + } + + def moveLeader(oldLeader: Int, newLeader: Int) { + val tap = new TopicAndPartition(topic, partition) + new ReassignPartitionsCommand(zkUtils, Map({tap -> Seq(newLeader, oldLeader)})).reassignPartitions() + TestUtils.waitUntilTrue(() => zkUtils.getPartitionsBeingReassigned().isEmpty, "Failed to finish partition assgiment before timeout") + PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, Set(tap)) + TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils = zkUtils, topic = topic, partition = partition, newLeaderOpt = Some(newLeader)) + assertTrue(TestUtils.isLeaderLocalOnBroker(topic, 0, servers(newLeader))) + } + } + +} From 846cf0620517457826512ca788ef49c7247cab8e Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 7 Mar 2016 11:30:21 -0800 Subject: [PATCH 2/7] Addressed Ismael's comments. --- core/src/main/scala/kafka/server/DelayedProduce.scala | 5 +---- core/src/main/scala/kafka/server/ReplicaManager.scala | 7 ++++--- .../kafka/api/ClearPurgatoryOnLeaderMovementTest.scala | 8 ++++---- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala index 52ec238b255c1..be1be4faf97c7 100644 --- a/core/src/main/scala/kafka/server/DelayedProduce.scala +++ b/core/src/main/scala/kafka/server/DelayedProduce.scala @@ -89,10 +89,7 @@ class DelayedProduce(delayMs: Long, val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) val (hasEnough, errorCode) = partitionOpt match { case Some(partition) => - if (partition.leaderReplicaIfLocal().isEmpty) - (false, Errors.NOT_LEADER_FOR_PARTITION.code) - else - partition.checkEnoughReplicasReachOffset(status.requiredOffset) + partition.checkEnoughReplicasReachOffset(status.requiredOffset) case None => // Case A (false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index be6713fc181d5..56cc5f346b162 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -737,7 +737,8 @@ class ReplicaManager(val config: KafkaConfig, * 2. Mark the replicas as followers so that no more data can be added from the producer clients. * 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads. * 4. Truncate the log and checkpoint offsets for these partitions. - * 5. If the broker is not shutting down, add the fetcher to the new leaders. + * 5. Clear the produce and fetch requests in the purgatory + * 6. If the broker is not shutting down, add the fetcher to the new leaders. * * The ordering of doing these steps make sure that the replicas in transition will not * take any more messages before checkpointing offsets so that all messages before the checkpoint @@ -800,11 +801,11 @@ class ReplicaManager(val config: KafkaConfig, } logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap) - partitionsToMakeFollower.foreach(partition => { + partitionsToMakeFollower.foreach { partition => val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId) tryCompleteDelayedProduce(topicPartitionOperationKey) tryCompleteDelayedFetch(topicPartitionOperationKey) - }) + } partitionsToMakeFollower.foreach { partition => stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " + diff --git a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala index ab2ccd2b026c7..01e43d3a80513 100644 --- a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala +++ b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala @@ -58,7 +58,7 @@ class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { // fetch the metadata. producer.partitionsFor(topic) - val future = producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, "key".getBytes, "value".getBytes)) + val future = producer.send(new ProducerRecord(topic, "key".getBytes, "value".getBytes)) // Wait 10 millisends to make sure the produce request has been sent. It is ugly but we don't have a way to // see if the producer has sent the produce request. Thread.sleep(10) @@ -78,11 +78,11 @@ class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { def moveLeader(oldLeader: Int, newLeader: Int) { val tap = new TopicAndPartition(topic, partition) - new ReassignPartitionsCommand(zkUtils, Map({tap -> Seq(newLeader, oldLeader)})).reassignPartitions() - TestUtils.waitUntilTrue(() => zkUtils.getPartitionsBeingReassigned().isEmpty, "Failed to finish partition assgiment before timeout") + new ReassignPartitionsCommand(zkUtils, Map(tap -> Seq(newLeader, oldLeader))).reassignPartitions() + TestUtils.waitUntilTrue(() => zkUtils.getPartitionsBeingReassigned().isEmpty, "Failed to finish partition assignment before timeout.") PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, Set(tap)) TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils = zkUtils, topic = topic, partition = partition, newLeaderOpt = Some(newLeader)) - assertTrue(TestUtils.isLeaderLocalOnBroker(topic, 0, servers(newLeader))) + assertTrue(TestUtils.isLeaderLocalOnBroker(topic, partition, servers(newLeader))) } } From c2d9e2911da842243a667509c58930abe2296c05 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 7 Mar 2016 13:15:54 -0800 Subject: [PATCH 3/7] Addressed Ismael's comments. --- .../kafka/api/ClearPurgatoryOnLeaderMovementTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala index 01e43d3a80513..9b9edb065a2b1 100644 --- a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala +++ b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala @@ -46,8 +46,7 @@ class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "100000") overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, "100000") overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, "100000") - TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile).map(KafkaConfig.fromProps(_, overridingProps)) + TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) } @Test From 9dd1369a3f8d38a88deb346ab43fda62ea523351 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 8 Mar 2016 14:59:36 -0800 Subject: [PATCH 4/7] Addressed Joel's comments. --- .../ClearPurgatoryOnLeaderMovementTest.scala | 88 ------------------- .../kafka/server/ReplicaManagerTest.scala | 75 ++++++++++++++-- 2 files changed, 66 insertions(+), 97 deletions(-) delete mode 100644 core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala diff --git a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala b/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala deleted file mode 100644 index 9b9edb065a2b1..0000000000000 --- a/core/src/test/scala/integration/kafka/api/ClearPurgatoryOnLeaderMovementTest.scala +++ /dev/null @@ -1,88 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package integration.kafka.api - -import java.util.Properties -import java.util.concurrent.ExecutionException - -import kafka.admin.{PreferredReplicaLeaderElectionCommand, ReassignPartitionsCommand} -import kafka.common.TopicAndPartition -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.TestUtils -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.errors.NotLeaderForPartitionException -import org.junit.Assert.{assertFalse, assertTrue} -import org.junit.Test - -/** - * Test the case where produce requests in the purgatory should be cleared when the leader migrates to another broker. - */ -class ClearPurgatoryOnLeaderMovementTest extends KafkaServerTestHarness { - - val topic = "topic" - val partition = 0 - def generateConfigs = { - val overridingProps = new Properties() - val numServers = 2 - overridingProps.put(KafkaConfig.NumPartitionsProp, 1.toString) - // Make sure the fetch request will not return. - overridingProps.put(KafkaConfig.ReplicaLagTimeMaxMsProp, "100000") - overridingProps.put(KafkaConfig.ReplicaSocketTimeoutMsProp, "100000") - overridingProps.put(KafkaConfig.ReplicaFetchMinBytesProp, "100000") - overridingProps.put(KafkaConfig.ReplicaFetchWaitMaxMsProp, "100000") - TestUtils.createBrokerConfigs(numServers, zkConnect).map(KafkaConfig.fromProps(_, overridingProps)) - } - - @Test - def testCleanProducerPurgatoryOnBecomeFollower() { - TestUtils.createTopic(zkUtils, topic, 1, 2, servers, new Properties()) - TestUtils.waitUntilLeaderIsKnown(servers, topic, partition) - val producer = TestUtils.createNewProducer(brokerList) - - // fetch the metadata. - producer.partitionsFor(topic) - val future = producer.send(new ProducerRecord(topic, "key".getBytes, "value".getBytes)) - // Wait 10 millisends to make sure the produce request has been sent. It is ugly but we don't have a way to - // see if the producer has sent the produce request. - Thread.sleep(10) - assertFalse(future.isDone) - // Trigger a leader migration - if (TestUtils.isLeaderLocalOnBroker(topic, 0, servers(0))) - moveLeader(oldLeader = 0, newLeader = 1) - else - moveLeader(oldLeader = 1, newLeader = 0) - try { - future.get() - fail("Should throw ExecutionException") - } catch { - case e: ExecutionException => - assertTrue(e.getCause.isInstanceOf[NotLeaderForPartitionException]) - } - - def moveLeader(oldLeader: Int, newLeader: Int) { - val tap = new TopicAndPartition(topic, partition) - new ReassignPartitionsCommand(zkUtils, Map(tap -> Seq(newLeader, oldLeader))).reassignPartitions() - TestUtils.waitUntilTrue(() => zkUtils.getPartitionsBeingReassigned().isEmpty, "Failed to finish partition assignment before timeout.") - PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, Set(tap)) - TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils = zkUtils, topic = topic, partition = partition, newLeaderOpt = Some(newLeader)) - assertTrue(TestUtils.isLeaderLocalOnBroker(topic, partition, servers(newLeader))) - } - } - -} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 32085f6ae0d7e..398b1951e822a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -18,25 +18,26 @@ package kafka.server -import kafka.api.SerializationTestUtils -import kafka.message.{Message, ByteBufferMessageSet} -import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils} -import org.apache.kafka.common.requests.ProduceRequest - -import java.util.concurrent.atomic.AtomicBoolean import java.io.File +import java.util.concurrent.atomic.AtomicBoolean +import kafka.cluster.Broker +import kafka.message.{ByteBufferMessageSet, Message} +import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} +import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.LeaderAndIsrRequest +import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime => JMockTime} +import org.apache.kafka.common.{BrokerEndPoint, TopicPartition} import org.easymock.EasyMock -import org.I0Itec.zkclient.ZkClient +import org.junit.Assert.{assertTrue, assertEquals} import org.junit.Test +import scala.collection.JavaConversions._ import scala.collection.Map -import scala.collection.JavaConverters._ class ReplicaManagerTest { @@ -118,4 +119,60 @@ class ReplicaManagerTest { TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName) } + + @Test + def testClearPurgatoryOnBecomingFollower() { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) + val config = KafkaConfig.fromProps(props) + val zkClient = EasyMock.createMock(classOf[ZkClient]) + val zkUtils = ZkUtils(zkClient, false) + val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) + val time: MockTime = new MockTime() + val jTime = new JMockTime + val metrics = new Metrics + val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, + new AtomicBoolean(false)) + + try { + var callbackFired = false + def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { + assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.errorCode) + callbackFired = true + } + + val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1)) + val metadataCache = EasyMock.createMock(classOf[MetadataCache]) + EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() + EasyMock.replay(metadataCache) + + val partition = rm.getOrCreatePartition(topic, 0) + partition.getOrCreateReplica(0) + // Make this replica the leader. + val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0, + mapAsJavaMap(collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, seqAsJavaList(Seq(0, 1)), 0, setAsJavaSet(Set(0, 1))))), + setAsJavaSet(Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)))) + rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {}) + rm.getLeaderReplicaIfLocal(topic, 0) + + // Append a message. + rm.appendMessages( + timeout = 1000, + requiredAcks = -1, + internalTopicsAllowed = false, + messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), + responseCallback = callback) + + // Make this replica the follower + val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0, + mapAsJavaMap(collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, seqAsJavaList(Seq(0, 1)), 1, setAsJavaSet(Set(0, 1))))), + setAsJavaSet(Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)))) + rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {}) + + assertTrue(callbackFired) + } finally { + rm.shutdown(false) + metrics.close() + } + } } From 7589c17dde5ee93a7bced286b634dd4cec30d783 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Wed, 9 Mar 2016 13:05:39 -0800 Subject: [PATCH 5/7] Addressed Ismael's comments. --- .../kafka/server/ReplicaManagerTest.scala | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 398b1951e822a..93951e6b25f68 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -33,10 +33,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime => JMockTime} import org.apache.kafka.common.{BrokerEndPoint, TopicPartition} import org.easymock.EasyMock -import org.junit.Assert.{assertTrue, assertEquals} +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map class ReplicaManagerTest { @@ -50,7 +50,7 @@ class ReplicaManagerTest { val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -74,7 +74,7 @@ class ReplicaManagerTest { val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -97,7 +97,7 @@ class ReplicaManagerTest { val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -128,7 +128,7 @@ class ReplicaManagerTest { val zkClient = EasyMock.createMock(classOf[ZkClient]) val zkUtils = ZkUtils(zkClient, false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) - val time: MockTime = new MockTime() + val time = new MockTime() val jTime = new JMockTime val metrics = new Metrics val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr, @@ -150,8 +150,9 @@ class ReplicaManagerTest { partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0, - mapAsJavaMap(collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, seqAsJavaList(Seq(0, 1)), 0, setAsJavaSet(Set(0, 1))))), - setAsJavaSet(Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)))) + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, Seq(0, 1).asJava.asInstanceOf[java.util.List[Integer]], + 0, Set(0, 1).asJava.asInstanceOf[java.util.Set[Integer]])).asJava, + Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {}) rm.getLeaderReplicaIfLocal(topic, 0) @@ -165,8 +166,9 @@ class ReplicaManagerTest { // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0, - mapAsJavaMap(collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, seqAsJavaList(Seq(0, 1)), 1, setAsJavaSet(Set(0, 1))))), - setAsJavaSet(Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)))) + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, Seq(0, 1).asJava.asInstanceOf[java.util.List[Integer]], + 0, Set(0, 1).asJava.asInstanceOf[java.util.Set[Integer]])).asJava, + Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {}) assertTrue(callbackFired) From 29c8d93f18bad209d9b34c240ed5f03a9470f043 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 10 Mar 2016 15:55:44 -0800 Subject: [PATCH 6/7] Addressed Joel and Ismael's comments. --- .../kafka/server/ReplicaManagerTest.scala | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 93951e6b25f68..8d6c84ce4c46c 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -21,7 +21,9 @@ package kafka.server import java.io.File import java.util.concurrent.atomic.AtomicBoolean +import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo} import kafka.cluster.Broker +import kafka.common.TopicAndPartition import kafka.message.{ByteBufferMessageSet, Message} import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils} import org.I0Itec.zkclient.ZkClient @@ -48,7 +50,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val zkUtils = ZkUtils(zkClient, false) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time = new MockTime() val jTime = new JMockTime @@ -85,7 +87,7 @@ class ReplicaManagerTest { rm.checkpointHighWatermarks() } finally { // shutdown the replica manager upon test completion - rm.shutdown(false) + rm.shutdown(checkpointHW = false) metrics.close() } } @@ -113,7 +115,7 @@ class ReplicaManagerTest { messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), responseCallback = callback) } finally { - rm.shutdown(false) + rm.shutdown(checkpointHW = false) metrics.close() } @@ -126,7 +128,7 @@ class ReplicaManagerTest { props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val zkUtils = ZkUtils(zkClient, false) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time = new MockTime() val jTime = new JMockTime @@ -135,10 +137,16 @@ class ReplicaManagerTest { new AtomicBoolean(false)) try { - var callbackFired = false - def callback(responseStatus: Map[TopicPartition, PartitionResponse]) = { + var produceCallbackFired = false + def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = { assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.errorCode) - callbackFired = true + produceCallbackFired = true + } + + var fetchCallbackFired = false + def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = { + assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.error) + fetchCallbackFired = true } val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1)) @@ -146,12 +154,14 @@ class ReplicaManagerTest { EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes() EasyMock.replay(metadataCache) + val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava + val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava + val partition = rm.getOrCreatePartition(topic, 0) partition.getOrCreateReplica(0) // Make this replica the leader. val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, Seq(0, 1).asJava.asInstanceOf[java.util.List[Integer]], - 0, Set(0, 1).asJava.asInstanceOf[java.util.Set[Integer]])).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava, Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {}) rm.getLeaderReplicaIfLocal(topic, 0) @@ -162,18 +172,26 @@ class ReplicaManagerTest { requiredAcks = -1, internalTopicsAllowed = false, messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))), - responseCallback = callback) + responseCallback = produceCallback) + + // Fetch some messages + rm.fetchMessages( + timeout = 1000, + replicaId = -1, + fetchMinBytes = 100000, + fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(0, 100000)), + responseCallback = fetchCallback) // Make this replica the follower val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0, - collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, Seq(0, 1).asJava.asInstanceOf[java.util.List[Integer]], - 0, Set(0, 1).asJava.asInstanceOf[java.util.Set[Integer]])).asJava, + collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava, Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava) rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {}) - assertTrue(callbackFired) + assertTrue(produceCallbackFired) + assertTrue(fetchCallbackFired) } finally { - rm.shutdown(false) + rm.shutdown(checkpointHW = false) metrics.close() } } From 0642e5ff6a079ca0f89f553ab751bcc1bdfab34d Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Fri, 11 Mar 2016 09:35:09 -0800 Subject: [PATCH 7/7] Addressed Joel and Ismaels' comments. --- core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 8d6c84ce4c46c..a5a8df1e32aaf 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -97,7 +97,7 @@ class ReplicaManagerTest { val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect) val config = KafkaConfig.fromProps(props) val zkClient = EasyMock.createMock(classOf[ZkClient]) - val zkUtils = ZkUtils(zkClient, false) + val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false) val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray) val time = new MockTime() val jTime = new JMockTime