Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,8 @@ class ReplicaManager(val config: KafkaConfig,
* 2. Mark the replicas as followers so that no more data can be added from the producer clients.
* 3. Stop fetchers for these partitions so that no more data can be added by the replica fetcher threads.
* 4. Truncate the log and checkpoint offsets for these partitions.
* 5. If the broker is not shutting down, add the fetcher to the new leaders.
* 5. Clear the produce and fetch requests in the purgatory
* 6. If the broker is not shutting down, add the fetcher to the new leaders.
*
* The ordering of doing these steps make sure that the replicas in transition will not
* take any more messages before checkpointing offsets so that all messages before the checkpoint
Expand Down Expand Up @@ -800,6 +801,11 @@ class ReplicaManager(val config: KafkaConfig,
}

logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topic, partition.partitionId)
tryCompleteDelayedProduce(topicPartitionOperationKey)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DelayedProduce, we don't recheck if the leader has changed or now. Is tryCompleteDelayedProduce() enough to force the produce request to be completed? We probably need to add a forceComplete() in delayedProducePurgatory?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering about this as well, but I think we do this right? i.e., it makes a call to Partition.checkEnoughReplicasReachOffset which returns (false, Errors.NOT_LEADER_FOR_PARTITION.code) if the leader is non-local

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. Then the patch LGTM.

tryCompleteDelayedFetch(topicPartitionOperationKey)
}

partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
Expand Down
109 changes: 93 additions & 16 deletions core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@
package kafka.server


import kafka.api.SerializationTestUtils
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.utils.{ZkUtils, MockScheduler, MockTime, TestUtils}
import org.apache.kafka.common.requests.ProduceRequest

import java.util.concurrent.atomic.AtomicBoolean
import java.io.File
import java.util.concurrent.atomic.AtomicBoolean

import kafka.api.{FetchResponsePartitionData, PartitionFetchInfo}
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
import kafka.message.{ByteBufferMessageSet, Message}
import kafka.utils.{MockScheduler, MockTime, TestUtils, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.LeaderAndIsrRequest
import org.apache.kafka.common.requests.LeaderAndIsrRequest.PartitionState
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{MockTime => JMockTime}
import org.apache.kafka.common.{BrokerEndPoint, TopicPartition}
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaConverters is recommended over JavaConversions. Can you explain why you changed this?


import scala.collection.Map
import scala.collection.JavaConverters._
import scala.collection.Map

class ReplicaManagerTest {

Expand All @@ -47,9 +50,9 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val zkUtils = ZkUtils(zkClient, false)
val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val time = new MockTime()
val jTime = new JMockTime
val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
Expand All @@ -73,7 +76,7 @@ class ReplicaManagerTest {
val zkClient = EasyMock.createMock(classOf[ZkClient])
val zkUtils = ZkUtils(zkClient, false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val time = new MockTime()
val jTime = new JMockTime
val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
Expand All @@ -84,7 +87,7 @@ class ReplicaManagerTest {
rm.checkpointHighWatermarks()
} finally {
// shutdown the replica manager upon test completion
rm.shutdown(false)
rm.shutdown(checkpointHW = false)
metrics.close()
}
}
Expand All @@ -94,9 +97,9 @@ class ReplicaManagerTest {
val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val zkUtils = ZkUtils(zkClient, false)
val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val time = new MockTime()
val jTime = new JMockTime
val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
Expand All @@ -112,10 +115,84 @@ class ReplicaManagerTest {
messagesPerPartition = Map(new TopicPartition("test1", 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))),
responseCallback = callback)
} finally {
rm.shutdown(false)
rm.shutdown(checkpointHW = false)
metrics.close()
}

TestUtils.verifyNonDaemonThreadsStatus(this.getClass.getName)
}

@Test
def testClearPurgatoryOnBecomingFollower() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = KafkaConfig.fromProps(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
val zkUtils = ZkUtils(zkClient, isZkSecurityEnabled = false)
val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time = new MockTime()
val jTime = new JMockTime
val metrics = new Metrics
val rm = new ReplicaManager(config, metrics, time, jTime, zkUtils, new MockScheduler(time), mockLogMgr,
new AtomicBoolean(false))

try {
var produceCallbackFired = false
def produceCallback(responseStatus: Map[TopicPartition, PartitionResponse]) = {
assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.errorCode)
produceCallbackFired = true
}

var fetchCallbackFired = false
def fetchCallback(responseStatus: Map[TopicAndPartition, FetchResponsePartitionData]) = {
assertEquals("Should give NotLeaderForPartitionException", Errors.NOT_LEADER_FOR_PARTITION.code, responseStatus.values.head.error)
fetchCallbackFired = true
}

val aliveBrokers = Seq(new Broker(0, "host0", 0), new Broker(1, "host1", 1))
val metadataCache = EasyMock.createMock(classOf[MetadataCache])
EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
EasyMock.replay(metadataCache)

val brokerList : java.util.List[Integer] = Seq[Integer](0, 1).asJava
val brokerSet : java.util.Set[Integer] = Set[Integer](0, 1).asJava

val partition = rm.getOrCreatePartition(topic, 0)
partition.getOrCreateReplica(0)
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 0, 0, brokerList, 0, brokerSet)).asJava,
Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, metadataCache, (_, _) => {})
rm.getLeaderReplicaIfLocal(topic, 0)

// Append a message.
rm.appendMessages(
timeout = 1000,
requiredAcks = -1,
internalTopicsAllowed = false,
messagesPerPartition = Map(new TopicPartition(topic, 0) -> new ByteBufferMessageSet(new Message("first message".getBytes))),
responseCallback = produceCallback)

// Fetch some messages
rm.fetchMessages(
timeout = 1000,
replicaId = -1,
fetchMinBytes = 100000,
fetchInfo = collection.immutable.Map(new TopicAndPartition(topic, 0) -> new PartitionFetchInfo(0, 100000)),
responseCallback = fetchCallback)

// Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest(0, 0,
collection.immutable.Map(new TopicPartition(topic, 0) -> new PartitionState(0, 1, 1, brokerList, 0, brokerSet)).asJava,
Set(new BrokerEndPoint(0, "host1", 0), new BrokerEndPoint(1, "host2", 1)).asJava)
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, metadataCache, (_, _) => {})

assertTrue(produceCallbackFired)
assertTrue(fetchCallbackFired)
} finally {
rm.shutdown(checkpointHW = false)
metrics.close()
}
}
}