Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
f8be865
KAFKA-1387: First cut, node dependency on curator
fpj Aug 23, 2015
b8f901b
KAFKA-1387: First cut, node dependency on curator
fpj Aug 23, 2015
2369e66
Merge branch '1387' of https://github.com/fpj/kafka into 1387
fpj Aug 23, 2015
f03c301
KAFKA-1387: First cut, node dependency on curator
fpj Aug 23, 2015
d8eab9e
KAFKA-1387: Some polishing
fpj Aug 24, 2015
b7cbe5d
KAFKA-1387: Style fixes
fpj Aug 24, 2015
336f67c
KAFKA-1387: More style fixes
fpj Aug 24, 2015
201ab2d
Merge branch '1387' of https://github.com/fpj/kafka into 1387
fpj Aug 24, 2015
9961665
KAFKA-1387: First cut, node dependency on curator
fpj Aug 23, 2015
b52c124
KAFKA-1387: Some polishing
fpj Aug 24, 2015
b2400a0
KAFKA-1387: Style fixes
fpj Aug 24, 2015
888f6e0
KAFKA-1387: More style fixes
fpj Aug 24, 2015
d675b02
KAFKA-1387
fpj Aug 29, 2015
4c83ac2
KAFKA-1387: Removing whitespaces.
fpj Aug 29, 2015
240b51a
Merge branch '1387' of https://github.com/fpj/kafka into 1387
fpj Aug 29, 2015
af28ca1
KAFKA-1387: Added checker back and other improvements.
fpj Sep 2, 2015
3a46b28
KAFKA-1387: Removed messages used for local debugging.
fpj Sep 2, 2015
c316d7b
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 2, 2015
d694fd6
KAFKA-1387: Re-adding imports removed by mistake.
fpj Sep 2, 2015
f4aa131
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 3, 2015
ddc41d9
KAFKA-1387: Added test cases and did some general cleanup.
fpj Sep 8, 2015
ed7345a
KAFKA-1387: Removed whitespace.
fpj Sep 8, 2015
2b2ad5b
KAFKA-1387: Adding debug.
fpj Sep 11, 2015
737d493
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 14, 2015
f6ce8de
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 14, 2015
8f351b3
KAFKA-1387: Added debug messages
fpj Sep 19, 2015
f2e738f
KAFKA-1387: Updating lines added for test debugging.
fpj Sep 20, 2015
866c984
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 20, 2015
6ed03c8
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 22, 2015
422f320
KAFKA-1387: Addressing first batch of comments.
fpj Sep 22, 2015
b7e45e7
KAFKA-1387: Simplified the whole mechanism, removing the watch and re…
fpj Sep 23, 2015
d33242e
Merge remote-tracking branch 'upstream/trunk' into 1387
fpj Sep 23, 2015
84cacc2
KAFKA-1387: Second round of comments addressed.
fpj Sep 24, 2015
390fc11
KAFKA-1387: Third round of comments.
fpj Sep 24, 2015
7a17280
KAFKA-1387: More spaces removed and parameters aligned.
fpj Sep 24, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,12 @@ import kafka.utils.CoreUtils.inLock
import kafka.utils.ZkUtils._
import kafka.utils._
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
import org.apache.zookeeper.Watcher.Event.KeeperState

import scala.collection._
import scala.collection.JavaConversions._


/**
* This class handles the consumers interaction with zookeeper
*
Expand Down Expand Up @@ -90,6 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
private var zkClient: ZkClient = null
private var zkConnection : ZkConnection = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val checkpointedZkOffsets = new Pool[TopicAndPartition, Long]
private val topicThreadIdAndQueues = new Pool[(String, ConsumerThreadId), BlockingQueue[FetchedDataChunk]]
Expand Down Expand Up @@ -178,7 +178,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,

private def connectZk() {
info("Connecting to zookeeper instance at " + config.zkConnect)
zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
val (client, connection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
zkClient = client
zkConnection = connection
}

// Blocks until the offset manager is located and a channel is established to it.
Expand Down Expand Up @@ -261,9 +263,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val timestamp = SystemTime.milliseconds.toString
val consumerRegistrationInfo = Json.encode(Map("version" -> 1, "subscription" -> topicCount.getTopicCountMap, "pattern" -> topicCount.pattern,
"timestamp" -> timestamp))
val zkWatchedEphemeral = new ZKCheckedEphemeral(dirs.
consumerRegistryDir + "/" + consumerIdString,
consumerRegistrationInfo,
zkConnection.getZookeeper)
zkWatchedEphemeral.create()

createEphemeralPathExpectConflictHandleZKBug(zkClient, dirs.consumerRegistryDir + "/" + consumerIdString, consumerRegistrationInfo, null,
(consumerZKString, consumer) => true, config.zkSessionTimeoutMs)
info("end registering consumer " + consumerIdString + " in ZK")
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/controller/KafkaController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ import kafka.utils.CoreUtils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient, ZkConnection}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.locks.ReentrantLock
import kafka.server._
import kafka.common.TopicAndPartition

class ControllerContext(val zkClient: ZkClient,
val zkConnection: ZkConnection,
val zkSessionTimeout: Int) {
var controllerChannelManager: ControllerChannelManager = null
val controllerLock: ReentrantLock = new ReentrantLock()
Expand Down Expand Up @@ -153,11 +154,11 @@ object KafkaController extends Logging {
}
}

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
class KafkaController(val config : KafkaConfig, zkClient: ZkClient, zkConnection: ZkConnection, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val controllerContext = new ControllerContext(zkClient, zkConnection, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
Expand Down
12 changes: 5 additions & 7 deletions core/src/main/scala/kafka/server/KafkaHealthcheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import kafka.cluster.EndPoint
import kafka.utils._
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
import org.I0Itec.zkclient.{IZkStateListener, ZkClient, ZkConnection}
import java.net.InetAddress


Expand All @@ -35,8 +35,8 @@ import java.net.InetAddress
*/
class KafkaHealthcheck(private val brokerId: Int,
private val advertisedEndpoints: Map[SecurityProtocol, EndPoint],
private val zkSessionTimeoutMs: Int,
private val zkClient: ZkClient) extends Logging {
private val zkClient: ZkClient,
private val zkConnection: ZkConnection) extends Logging {

val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + brokerId
val sessionExpireListener = new SessionExpireListener
Expand All @@ -62,7 +62,7 @@ class KafkaHealthcheck(private val brokerId: Int,
// only PLAINTEXT is supported as default
// if the broker doesn't listen on PLAINTEXT protocol, an empty endpoint will be registered and older clients will be unable to connect
val plaintextEndpoint = updatedEndpoints.getOrElse(SecurityProtocol.PLAINTEXT, new EndPoint(null,-1,null))
ZkUtils.registerBrokerInZk(zkClient, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, zkSessionTimeoutMs, jmxPort)
ZkUtils.registerBrokerInZk(zkClient, zkConnection, brokerId, plaintextEndpoint.host, plaintextEndpoint.port, updatedEndpoints, jmxPort)
}

/**
Expand All @@ -71,9 +71,7 @@ class KafkaHealthcheck(private val brokerId: Int,
*/
class SessionExpireListener() extends IZkStateListener {
@throws(classOf[Exception])
def handleStateChanged(state: KeeperState) {
// do nothing, since zkclient will do reconnect for us.
}
def handleStateChanged(state: KeeperState) {}

/**
* Called after the zookeeper session has expired and a new session has been created. You would have to re-create
Expand Down
17 changes: 10 additions & 7 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.kafka.common.utils.AppInfoParser

import scala.collection.mutable
import scala.collection.JavaConverters._
import org.I0Itec.zkclient.ZkClient
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import kafka.controller.{ControllerStats, KafkaController}
import kafka.cluster.{EndPoint, Broker}
import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBrokerIdException}
Expand Down Expand Up @@ -129,6 +129,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val metadataCache: MetadataCache = new MetadataCache(config.brokerId)

var zkClient: ZkClient = null
var zkConnection: ZkConnection = null
val correlationId: AtomicInteger = new AtomicInteger(0)
val brokerMetaPropsFile = "meta.properties"
val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator +brokerMetaPropsFile)))).toMap
Expand Down Expand Up @@ -164,7 +165,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
kafkaScheduler.startup()

/* setup zookeeper */
zkClient = initZk()
val (client, connection) = initZk()
zkClient = client
zkConnection = connection

/* start log manager */
logManager = createLogManager(zkClient, brokerState)
Expand All @@ -183,7 +186,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
replicaManager.startup()

/* start kafka controller */
kafkaController = new KafkaController(config, zkClient, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController = new KafkaController(config, zkClient, zkConnection, brokerState, kafkaMetricsTime, metrics, threadNamePrefix)
kafkaController.startup()

/* start kafka coordinator */
Expand Down Expand Up @@ -220,7 +223,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
else
(protocol, endpoint)
}
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, config.zkSessionTimeoutMs, zkClient)
kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, listeners, zkClient, zkConnection)
kafkaHealthcheck.startup()

/* register broker metrics */
Expand All @@ -242,7 +245,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
}
}

private def initZk(): ZkClient = {
private def initZk(): (ZkClient, ZkConnection) = {
info("Connecting to zookeeper on " + config.zkConnect)

val chroot = {
Expand All @@ -260,9 +263,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
zkClientForChrootCreation.close()
}

val zkClient = ZkUtils.createZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
val (zkClient, zkConnection) = ZkUtils.createZkClientAndConnection(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs)
ZkUtils.setupCommonPaths(zkClient)
zkClient
(zkClient, zkConnection)
}


Expand Down
11 changes: 6 additions & 5 deletions core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package kafka.server

import kafka.utils.ZkUtils._
import kafka.utils.CoreUtils._
import kafka.utils.{Json, SystemTime, Logging}
import kafka.utils.{Json, SystemTime, Logging, ZKCheckedEphemeral}
import org.I0Itec.zkclient.exception.ZkNodeExistsException
import org.I0Itec.zkclient.IZkDataListener
import kafka.controller.ControllerContext
Expand Down Expand Up @@ -56,7 +56,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
case None => -1
}
}

def elect: Boolean = {
val timestamp = SystemTime.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))
Expand All @@ -73,9 +73,10 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
}

try {
createEphemeralPathExpectConflictHandleZKBug(controllerContext.zkClient, electionPath, electString, brokerId,
(controllerString : String, leaderId : Any) => KafkaController.parseControllerId(controllerString) == leaderId.asInstanceOf[Int],
controllerContext.zkSessionTimeout)
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
electString,
controllerContext.zkConnection.getZookeeper)
zkCheckedEphemeral.create()
info(brokerId + " successfully elected as leader")
leaderId = brokerId
onBecomingLeader()
Expand Down
Loading