Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
6a1ca42
Initial pass, main code compiles
fpj Oct 12, 2015
afeafab
Changes to tests to accomodate the refactoring of ZkUtils.
fpj Oct 13, 2015
66b116a
Removed whitespaces.
fpj Oct 13, 2015
36c3720
KAFKA-2639: Added close() method to ZkUtils
fpj Oct 13, 2015
a7ae433
KAFKA-2639: Fixed PartitionAssignorTest
fpj Oct 13, 2015
78ee23d
KAFKA-2639: Fixed ReplicaManagerTest
fpj Oct 13, 2015
2e888de
KAFKA-2639: Fixed ZKPathTest
fpj Oct 13, 2015
b94fd4b
KAFKA-2639: Made isSecure a parameter of the factory methods for ZkUtils
fpj Oct 13, 2015
bd46f61
KAFKA-2639: Fixed KafkaConfigTest.
fpj Oct 13, 2015
8c69f23
KAFKA-2639: Removing config via KafkaConfig.
fpj Oct 14, 2015
00a8169
KAFKA-2639: Removed whitespaces.
fpj Oct 14, 2015
6b2fd2a
Adding initial configuration and support to set acls
fpj Oct 14, 2015
311612f
KAFKA-2639: Removed unrelated comment from ZkUtils.
fpj Oct 14, 2015
f76c72a
KAFKA-2641: First cut at the ZK Security Migration Tool.
fpj Oct 14, 2015
fb9a52a
KAFKA-2639: Moved isSecure to JaasUtils in clients.
fpj Oct 14, 2015
8314c7f
KAFKA-2639: Covering more zk system properties.
fpj Oct 14, 2015
76a802d
KAFKA-2639: Small update to log message and exception message in Jaas…
fpj Oct 14, 2015
45e39b6
KAFKA-2641: Adding script and moving the tool to the admin package.
fpj Oct 15, 2015
83e1dc5
Merge branch 'KAFKA-2639' into KAFKA-2641
fpj Oct 15, 2015
02f1ae2
Merge remote-tracking branch 'upstream/trunk' into KAFKA-2641
fpj Oct 15, 2015
fd799b0
Merge remote-tracking branch 'upstream/trunk' into KAFKA-2639
fpj Oct 15, 2015
cb0c751
KAFKA-2641: Polished migration tool code.
fpj Oct 15, 2015
58e17b2
KAFKA-2639: create->apply and isSecure->isZkSecurityEnabled
fpj Oct 15, 2015
943f30d
Merge branch 'KAFKA-2639' into KAFKA-2641
fpj Oct 16, 2015
fb56da6
KAFKA-2641: Added license header to ZkSecurityMigrator.
fpj Oct 17, 2015
cf5ab6a
KAFKA-2641: Fixed bug in ZkSecurityMigrator.
fpj Oct 17, 2015
3c2c8e7
Merge remote-tracking branch 'upstream/trunk' into KAFKA-2641
fpj Oct 19, 2015
1a8d6b7
KAFKA-2641: Added functionality to set acls recursively.
fpj Oct 19, 2015
f072d15
KAFKA-2641: System property has priority over config parameter.
fpj Oct 19, 2015
1e66b3f
KAFKA-2641: Build child path.
fpj Oct 19, 2015
2cb1d73
KAFKA-2641: Changed the configuration of ZkUtils in KafkaServer.
fpj Oct 19, 2015
175ff18
KAFKA-2641: Changes to migration tool to get it to work.
fpj Oct 21, 2015
ecdfb54
KAFKA-2641: Addressed comments.
fpj Oct 21, 2015
60a0f00
KAFKA-2641: Adding a explicit list of paths to secure.
fpj Oct 22, 2015
a0273b8
KAFKA-2641: Improvements and additions to the config of the migration…
fpj Oct 22, 2015
5f49946
KAFKA-2641: Using futures.
fpj Oct 22, 2015
5e812b4
KAFKA-2641: Improvements to the migration tool, added anti-migration …
fpj Oct 22, 2015
8c58e0e
KAFKA-2641: Added description to the migration tool.
fpj Oct 22, 2015
1a2b361
KAFKA-2641: Addressing round of comments.
fpj Oct 23, 2015
4f85771
Make the code more functional.
ijuma Oct 23, 2015
1fd97d6
Replace `StringBuilder` with string interpolation
ijuma Oct 23, 2015
316f831
Merge pull request #1 from ijuma/KAFKA-2641
fpj Oct 23, 2015
dbb05fb
KAFKA-2641: Few improvements to the migrator tool.
fpj Oct 23, 2015
36cef5f
KAFKA-2641: Fixed issue with the management of futures.
fpj Oct 23, 2015
b317251
KAFKA-2641: Moved dequeue to eliminate redundant match.
fpj Oct 23, 2015
332b125
KAFKA-2641: Removed thread executor.
fpj Oct 23, 2015
0674774
KAFKA-2641: Improvement to output messages.
fpj Oct 23, 2015
d9f3618
KAFKA-2641: Replacing error with System.out.println in the migration …
fpj Oct 23, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions bin/zookeeper-security-migration.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

exec $(dirname $0)/kafka-run-class.sh kafka.admin.ZkSecurityMigrator $@
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ public static boolean isZkSecurityEnabled(String loginConfigFile) {
throw new KafkaException("Exception while determining if ZooKeeper is secure");
}
}
/*
* Tests fail if we don't reset the login configuration. It is unclear
* what is actually triggering this bug.
*/
Configuration.setConfiguration(null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we were discussing with @junrao, I think we need to either find a way not to need this or to at least do it in a safe way within our app (ie reload it atomically).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm not comfortable with this at all, it doesn't sound robust. at the same time, I want to separate bugs in my code from bugs elsewhere, so I'm leaving this here for now, I need to understand what's causing this.


return isSecurityEnabled;
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ object AdminUtils extends Logging {

def topicExists(zkUtils: ZkUtils, topic: String): Boolean =
zkUtils.zkClient.exists(getTopicPath(topic))

def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
Expand Down
237 changes: 237 additions & 0 deletions core/src/main/scala/kafka/admin/ZkSecurityMigrator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.admin
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need the license header.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import joptsimple.OptionParser
import org.I0Itec.zkclient.exception.ZkException
import kafka.utils.{Logging, ZkUtils, CommandLineUtils}
import org.apache.log4j.Level
import org.apache.kafka.common.security.JaasUtils
import org.apache.zookeeper.AsyncCallback.{ChildrenCallback, StatCallback}
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.KeeperException
import org.apache.zookeeper.KeeperException.Code
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection._
import scala.collection.mutable.Queue
import scala.concurrent._
import scala.concurrent.duration._

/**
* This tool is to be used when making access to ZooKeeper authenticated or
* the other way around, when removing authenticated access. The exact steps
* to migrate a Kafka cluster from unsecure to secure with respect to ZooKeeper
* access are the following:
*
* 1- Perform a rolling upgrade of Kafka servers, setting zookeeper.set.acl to false
* and passing a valid JAAS login file via the system property
* java.security.auth.login.config
* 2- Perform a second rolling upgrade keeping the system property for the login file
* and now setting zookeeper.set.acl to true
* 3- Finally run this tool. There is a script under ./bin. Run
* ./bin/zookeeper-security-migration --help
* to see the configuration parameters. An example of running it is the following:
* ./bin/zookeeper-security-migration --zookeeper.acl=secure --zookeeper.connection=localhost:2181
*
* To convert a cluster from secure to unsecure, we need to perform the following
* steps:
* 1- Perform a rolling upgrade setting zookeeper.set.acl to false for each server
* 2- Run this migration tool, setting zookeeper.acl to unsecure
* 3- Perform another rolling upgrade to remove the system property setting the
* login file (java.security.auth.login.config).
*/

object ZkSecurityMigrator extends Logging {
val usageMessage = ("ZooKeeper Migration Tool Help. This tool updates the ACLs of "
+ "znodes as part of the process of setting up ZooKeeper "
+ "authentication.")

def run(args: Array[String]) {
var jaasFile = System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
val parser = new OptionParser()

val zkAclOpt = parser.accepts("zookeeper.acl", "Indicates whether to make the Kafka znodes in ZooKeeper secure or unsecure."
+ " The options are 'secure' and 'unsecure'").withRequiredArg().ofType(classOf[String])
val jaasFileOpt = parser.accepts("jaas.file", "JAAS Config file.").withOptionalArg().ofType(classOf[String])
val zkUrlOpt = parser.accepts("zookeeper.connect", "Sets the ZooKeeper connect string (ensemble). This parameter " +
"takes a comma-separated list of host:port pairs.").withRequiredArg().defaultsTo("localhost:2181").
ofType(classOf[String])
val zkSessionTimeoutOpt = parser.accepts("zookeeper.session.timeout", "Sets the ZooKeeper session timeout.").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
val zkConnectionTimeoutOpt = parser.accepts("zookeeper.connection.timeout", "Sets the ZooKeeper connection timeout.").
withRequiredArg().ofType(classOf[java.lang.Integer]).defaultsTo(30000)
val helpOpt = parser.accepts("help", "Print usage information.")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To support going from secured ZK to unsecured ZK, we will need an option to enable/disable the CREATOR_ALL acl.


val options = parser.parse(args : _*)
if (options.has(helpOpt))
CommandLineUtils.printUsageAndDie(parser, usageMessage)

if ((jaasFile == null) && !options.has(jaasFileOpt)) {
val errorMsg = ("No JAAS configuration file has been specified. Please make sure that you have set either " +
"the system property %s or the option %s".format(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, "--jaas.file"))
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}

if (jaasFile == null) {
jaasFile = options.valueOf(jaasFileOpt)
System.setProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM, jaasFile)
}

if (!JaasUtils.isZkSecurityEnabled(jaasFile)) {
val errorMsg = "Security isn't enabled, most likely the file isn't set properly: %s".format(jaasFile)
System.out.println("ERROR: %s".format(errorMsg))
throw new IllegalArgumentException("Incorrect configuration")
}

val zkAcl: Boolean = options.valueOf(zkAclOpt) match {
case "secure" =>
info("zookeeper.acl option is secure")
true
case "unsecure" =>
info("zookeeper.acl option is unsecure")
false
case _ =>
CommandLineUtils.printUsageAndDie(parser, usageMessage)
}
val zkUrl = options.valueOf(zkUrlOpt)
val zkSessionTimeout = options.valueOf(zkSessionTimeoutOpt).intValue
val zkConnectionTimeout = options.valueOf(zkConnectionTimeoutOpt).intValue
val zkUtils = ZkUtils(zkUrl, zkSessionTimeout, zkConnectionTimeout, zkAcl)
val migrator = new ZkSecurityMigrator(zkUtils)
migrator.run()
}

def main(args: Array[String]) {
try {
run(args)
} catch {
case e: Exception =>
e.printStackTrace()
}
}
}

class ZkSecurityMigrator(zkUtils: ZkUtils) extends Logging {
private val workQueue = new LinkedBlockingQueue[Runnable]
private val futures = new Queue[Future[String]]

private def setAclsRecursively(path: String) = {
info("Setting ACL for path %s".format(path))
val setPromise = Promise[String]
val childrenPromise = Promise[String]
futures.synchronized {
futures += setPromise.future
futures += childrenPromise.future
}
zkUtils.zkConnection.getZookeeper.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, setPromise)
zkUtils.zkConnection.getZookeeper.getChildren(path, false, GetChildrenCallback, childrenPromise)
}

private object GetChildrenCallback extends ChildrenCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
children: java.util.List[String]) {
val zkHandle = zkUtils.zkConnection.getZookeeper
val promise = ctx.asInstanceOf[Promise[String]]
Code.get(rc) match {
case Code.OK =>
// Set ACL for each child
for (child <- children.asScala)
setAclsRecursively(s"$path/$child")
promise success "done"
case Code.CONNECTIONLOSS =>
zkHandle.getChildren(path, false, GetChildrenCallback, ctx)
case Code.NONODE =>
warn("Node is gone, it could be have been legitimately deleted: %s".format(path))
promise success "done"
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
}
}
}

private object SetACLCallback extends StatCallback {
def processResult(rc: Int,
path: String,
ctx: Object,
stat: Stat) {
val zkHandle = zkUtils.zkConnection.getZookeeper
val promise = ctx.asInstanceOf[Promise[String]]

Code.get(rc) match {
case Code.OK =>
info("Successfully set ACLs for %s".format(path))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we need to print this instead of logging it.

promise success "done"
case Code.CONNECTIONLOSS =>
zkHandle.setACL(path, ZkUtils.DefaultAcls(zkUtils.isSecure), -1, SetACLCallback, ctx)
case Code.NONODE =>
warn("Znode is gone, it could be have been legitimately deleted: %s".format(path))
promise success "done"
case Code.SESSIONEXPIRED =>
// Starting a new session isn't really a problem, but it'd complicate
// the logic of the tool, so we quit and let the user re-run it.
System.out.println("ZooKeeper session expired while changing ACLs")
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
case _ =>
System.out.println("Unexpected return code: %d".format(rc))
promise failure ZkException.create(KeeperException.create(Code.get(rc)))
}
}
}

private def run(): Unit = {
try {
for (path <- zkUtils.securePersistentZkPaths) {
debug("Going to set ACL for %s".format(path))
zkUtils.makeSurePersistentPathExists(path)
setAclsRecursively(path)
}

@tailrec
def recurse(): Unit = {
val future = futures.synchronized {
futures.headOption
}
future match {
case Some(a) =>
Await.result(a, 6000 millis)
futures.synchronized { futures.dequeue }
recurse
case None =>
}
}
recurse()

} finally {
zkUtils.close
}
}

}
5 changes: 5 additions & 0 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ object Defaults {
/** ********* Zookeeper Configuration ***********/
val ZkSessionTimeoutMs = 6000
val ZkSyncTimeMs = 2000
val ZkEnableSecureAcls = false

/** ********* General Configuration ***********/
val MaxReservedBrokerId = 1000
Expand Down Expand Up @@ -186,6 +187,7 @@ object KafkaConfig {
val ZkSessionTimeoutMsProp = "zookeeper.session.timeout.ms"
val ZkConnectionTimeoutMsProp = "zookeeper.connection.timeout.ms"
val ZkSyncTimeMsProp = "zookeeper.sync.time.ms"
val ZkEnableSecureAclsProp = "zookeeper.set.acl"
/** ********* General Configuration ***********/
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
val BrokerIdProp = "broker.id"
Expand Down Expand Up @@ -328,6 +330,7 @@ object KafkaConfig {
val ZkSessionTimeoutMsDoc = "Zookeeper session timeout"
val ZkConnectionTimeoutMsDoc = "The max time that the client waits to establish a connection to zookeeper"
val ZkSyncTimeMsDoc = "How far a ZK follower can be behind a ZK leader"
val ZkEnableSecureAclsDoc = "Set client to use secure ACLs"
/** ********* General Configuration ***********/
val MaxReservedBrokerIdDoc = "Max number that can be used for a broker.id"
val BrokerIdDoc = "The broker id for this server. " +
Expand Down Expand Up @@ -505,6 +508,7 @@ object KafkaConfig {
.define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc)
.define(ZkConnectionTimeoutMsProp, INT, HIGH, ZkConnectionTimeoutMsDoc, false)
.define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc)
.define(ZkEnableSecureAclsProp, BOOLEAN, Defaults.ZkEnableSecureAcls, HIGH, ZkEnableSecureAclsDoc)

/** ********* General Configuration ***********/
.define(MaxReservedBrokerIdProp, INT, Defaults.MaxReservedBrokerId, atLeast(0), MEDIUM, MaxReservedBrokerIdProp)
Expand Down Expand Up @@ -684,6 +688,7 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka
val zkConnectionTimeoutMs: java.lang.Integer =
Option(getInt(KafkaConfig.ZkConnectionTimeoutMsProp)).getOrElse(getInt(KafkaConfig.ZkSessionTimeoutMsProp))
val zkSyncTimeMs: Int = getInt(KafkaConfig.ZkSyncTimeMsProp)
val zkEnableSecureAcls: Boolean = getBoolean(KafkaConfig.ZkEnableSecureAclsProp)

/** ********* General Configuration ***********/
val maxReservedBrokerId: Int = getInt(KafkaConfig.MaxReservedBrokerIdProp)
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
""
}

val secureAclsEnabled = JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)) && config.zkEnableSecureAcls
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this was already discussed, but shouldn't we be throwing a configuration error if someone sets zkEnableSecureAcls to true and isZkSecurityEnabled is false?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if(config.zkEnableSecureAcls && !secureAclsEnabled) {
throw new java.lang.SecurityException("zkEnableSecureAcls is true, but the verification of the JAAS login file failed.")
}
if (chroot.length > 1) {
val zkConnForChrootCreation = config.zkConnect.substring(0, config.zkConnect.indexOf("/"))
val zkClientForChrootCreation = ZkUtils(zkConnForChrootCreation,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
secureAclsEnabled)
zkClientForChrootCreation.makeSurePersistentPathExists(chroot)
info("Created zookeeper path " + chroot)
zkClientForChrootCreation.zkClient.close()
Expand All @@ -267,7 +272,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr
val zkUtils = ZkUtils(config.zkConnect,
config.zkSessionTimeoutMs,
config.zkConnectionTimeoutMs,
JaasUtils.isZkSecurityEnabled(System.getProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)))
secureAclsEnabled)
zkUtils.setupCommonPaths()
zkUtils
}
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/utils/ZkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ object ZkUtils {
* Get calls that only depend on static paths
*/
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
ZkUtils.BrokerTopicsPath + "/" + topic
}

def getTopicPartitionsPath(topic: String): String = {
Expand All @@ -126,7 +126,7 @@ object ZkUtils {
getTopicPartitionPath(topic, partitionId) + "/" + "state"

def getEntityConfigRootPath(entityType: String): String =
EntityConfigPath + "/" + entityType
ZkUtils.EntityConfigPath + "/" + entityType

def getEntityConfigPath(entityType: String, entity: String): String =
getEntityConfigRootPath(entityType) + "/" + entity
Expand All @@ -149,6 +149,15 @@ class ZkUtils(val zkClient: ZkClient,
BrokerSequenceIdPath,
IsrChangeNotificationPath)

val securePersistentZkPaths = Seq(BrokerIdsPath,
BrokerTopicsPath,
EntityConfigChangesPath,
getEntityConfigRootPath(ConfigType.Topic),
getEntityConfigRootPath(ConfigType.Client),
DeleteTopicsPath,
BrokerSequenceIdPath,
IsrChangeNotificationPath)

val DefaultAcls: java.util.List[ACL] = ZkUtils.DefaultAcls(isSecure)

def getController(): Int = {
Expand Down Expand Up @@ -713,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient,
def deletePartition(brokerId: Int, topic: String) {
val brokerIdPath = BrokerIdsPath + "/" + brokerId
zkClient.delete(brokerIdPath)
val brokerPartTopicPath = BrokerTopicsPath + "/" + topic + "/" + brokerId
val brokerPartTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic + "/" + brokerId
zkClient.delete(brokerPartTopicPath)
}

Expand Down
1 change: 0 additions & 1 deletion core/src/test/scala/other/kafka/DeleteZKPath.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ object DeleteZKPath {

val config = new ConsumerConfig(Utils.loadProps(args(0)))
val zkPath = args(1)

val zkUtils = ZkUtils(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, false)

try {
Expand Down
Loading