Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/admin/TopicCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,8 @@ object TopicCommand extends Logging {

private val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")

private val forceOpt = parser.accepts("force", "Suppress console prompts")
// This is not currently used, but we keep it for compatibility
parser.accepts("force", "Suppress console prompts")

private val excludeInternalTopicOpt = parser.accepts("exclude-internal", "exclude internal topics when running list or describe command. The internal topics will be listed by default")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,8 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
numMembersAwaitingJoin -= 1
}

if (isLeader(memberId)) {
leaderId = if (members.isEmpty) {
None
} else {
Some(members.keys.head)
}
}
if (isLeader(memberId))
leaderId = members.keys.headOption
}

def isPendingMember(memberId: String): Boolean = pendingMembers.contains(memberId) && !has(memberId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ class DelayedElectPreferredLeader(delayMs: Long,
responseCallback(timedout ++ fullResults)
}

private def timeoutWaiting = {
waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley Please check that we didn't intend to use this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this can be removed since the logic is folded into onComplete() already.

/**
* Try to complete the delayed operation by first checking if the operation
* can be completed by now. If yes execute the completion logic by calling
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ class KafkaApis(val requestChannel: RequestChannel,
setTopics(results)
val responseBody = new CreateTopicsResponse(responseData)
trace(s"Sending create topics response $responseData for correlation id " +
"${request.header.correlationId} to client ${request.header.clientId}.")
s"${request.header.correlationId} to client ${request.header.clientId}.")
responseBody
}
sendResponseMaybeThrottle(request, createResponse)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo

private def reCreate(): Stat = {
val codeAfterDelete = delete()
var codeAfterReCreate = codeAfterDelete
val codeAfterReCreate = codeAfterDelete
debug(s"Result of znode ephemeral deletion at $path is: $codeAfterDelete")
if (codeAfterDelete == Code.OK || codeAfterDelete == Code.NONODE) {
create()
Expand Down Expand Up @@ -1880,6 +1880,7 @@ object KafkaZkClient {
case _ => null
}
SetDataResponse(resultCode, setDataOp.path, ctx, stat, responseMetadata)
case zkOp => throw new IllegalStateException(s"Unexpected zkOp: $zkOp")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Please check that this is OK.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Yes, this part looks good to me.

}
case null => throw KeeperException.create(resultCode)
case _ => throw new IllegalStateException(s"Cannot unwrap $response because the first zookeeper op is not check op in original MultiRequest")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,17 +1274,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForLeaderToBecome(partition1, 1)

// topic 2 unchanged
try {
electResult.partitionResult(partition2).get()
fail("topic 2 wasn't requested")
} catch {
case e: ExecutionException =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted",
cause.getMessage)
assertEquals(0, currentLeader(partition2))
}
var e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted",
e.getMessage)
assertEquals(0, currentLeader(partition2))

// meaningful election with null partitions
electResult = client.electPreferredLeaders(null)
Expand All @@ -1298,17 +1292,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val unknownPartition = new TopicPartition("topic-does-not-exist", 0)
electResult = client.electPreferredLeaders(asList(unknownPartition))
assertEquals(Set(unknownPartition).asJava, electResult.partitions.get)
try {
electResult.partitionResult(unknownPartition).get()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("The partition does not exist.",
cause.getMessage)
assertEquals(1, currentLeader(partition1))
assertEquals(1, currentLeader(partition2))
}
e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
assertEquals("The partition does not exist.", e.getMessage)
assertEquals(1, currentLeader(partition1))
assertEquals(1, currentLeader(partition2))

// Now change the preferred leader to 2
changePreferredLeader(prefer2)
Expand All @@ -1318,15 +1306,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get)
waitForLeaderToBecome(partition1, 2)
assertEquals(1, currentLeader(partition2))
try {
electResult.partitionResult(unknownPartition).get()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException])
assertEquals("The partition does not exist.",
cause.getMessage)
}
e = intercept[ExecutionException](electResult.partitionResult(unknownPartition).get()).getCause
assertEquals(classOf[UnknownTopicOrPartitionException], e.getClass)
assertEquals("The partition does not exist.", e.getMessage)

// dupe partitions
electResult = client.electPreferredLeaders(asList(partition2, partition2))
Expand All @@ -1338,63 +1320,36 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
changePreferredLeader(prefer1)
// but shut it down...
servers(1).shutdown()
waitUntilTrue (
() => {
val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get()
return !description.asScala.flatMap{
case (topic, description) => description.partitions().asScala.map(
partition => partition.isr().asScala).flatten
}.exists(node => node.id == 1)
},
"Expect broker 1 to no longer be in any ISR"
)
waitUntilTrue (() => {
val description = client.describeTopics(Set(partition1.topic, partition2.topic).asJava).all().get()
val isr = description.asScala.values.flatMap(_.partitions.asScala.flatMap(_.isr.asScala))
!isr.exists(_.id == 1)
}, "Expect broker 1 to no longer be in any ISR")
Copy link
Copy Markdown
Member Author

@ijuma ijuma Feb 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tombentley after I fixed this, the test started failing. return was causing everything after this not to run. Can you please check what the issue is and fix it? cc @mjsax in case this is a regression and not a test problem.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : Good find. These are indeed existing bugs. There are 4 references of LeaderNotAvailableException in line 1358, 1372, 1380 and 1390. They all need to be changed to PreferredLeaderNotAvailableException. Once I fixed those locally, the test passes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Jun! Will fix.


// ... now what happens if we try to elect the preferred leader and it's down?
val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000)
electResult = client.electPreferredLeaders(asList(partition1), shortTimeout)
assertEquals(Set(partition1).asJava, electResult.partitions.get)
try {
electResult.partitionResult(partition1).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
assertEquals(2, currentLeader(partition1))

// preferred leader unavailable with null argument
electResult = client.electPreferredLeaders(null, shortTimeout)
try {
electResult.partitions.get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
}
try {
electResult.partitionResult(partition1).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
try {
electResult.partitionResult(partition2).get()
fail()
} catch {
case e: Exception =>
val cause = e.getCause
assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException])
assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))
}
e = intercept[ExecutionException](electResult.partitions.get()).getCause
assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)

e = intercept[ExecutionException](electResult.partitionResult(partition1).get()).getCause
assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))

e = intercept[ExecutionException](electResult.partitionResult(partition2).get()).getCause
assertEquals(classOf[PreferredLeaderNotAvailableException], e.getClass)
assertTrue(s"Wrong message ${e.getMessage}", e.getMessage.contains(
"Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy"))

assertEquals(2, currentLeader(partition1))
assertEquals(2, currentLeader(partition2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
/**
* Creates N consumers with the same group ID and ensures the group rebalances properly at each step
*/
private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String = topic): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
private def createConsumersWithGroupId(groupId: String, consumerCount: Int, executor: ExecutorService, topic: String): ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]] = {
val stableConsumers = ArrayBuffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
for (_ <- 1.to(consumerCount)) {
val newConsumer = createConsumerWithGroupId(groupId)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/kafka/tools/CustomDeserializerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import java.io.PrintStream
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.Deserializer
import org.hamcrest.CoreMatchers
import org.hamcrest.MatcherAssert._
import org.junit.Test
import org.junit.Assert.assertThat
import org.scalatest.mockito.MockitoSugar

class CustomDeserializer extends Deserializer[String] {
override def configure(configs: java.util.Map[String, _], isKey: Boolean): Unit = {
}

override def deserialize(topic: String, data: Array[Byte]): String = {
assertThat("topic must not be null", topic, CoreMatchers.notNullValue())
assertThat("topic must not be null", topic, CoreMatchers.notNullValue)
new String(data)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.Properties

import kafka.admin.PreferredReplicaLeaderElectionCommand
import kafka.common.{AdminCommandFailedException, TopicAndPartition}
import kafka.network.RequestChannel
import kafka.security.auth._
Expand All @@ -34,8 +33,7 @@ import org.apache.kafka.common.network.ListenerName
import org.junit.Assert._
import org.junit.{After, Test}

class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging /*with RackAwareTest*/ {

class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging {
var servers: Seq[KafkaServer] = Seq()

@After
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
val brokers = List(0)
TestUtils.createBrokersInZk(zkClient, brokers)

val topic = "testTopic"
topicService.createTopic(new TopicCommandOptions(
Array("--partitions", "1", "--replication-factor", "1", "--topic", testTopicName)))

Expand Down Expand Up @@ -570,4 +569,4 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with RackAwareT
assertFalse(TestUtils.grabConsoleOutput(topicService.deleteTopic(escapedCommandOpts)).contains(topic2))
assertTrue(TestUtils.grabConsoleOutput(topicService.deleteTopic(unescapedCommandOpts)).contains(topic2))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
}

def assertCheckArgsExitCode(expected: Int, options: TopicCommandOptions) {
assertExitCode(expected, options.checkArgs)
assertExitCode(expected, options.checkArgs _)
}

def createAndWaitTopic(opts: TopicCommandOptions): Unit = {
Expand Down Expand Up @@ -635,4 +635,4 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin
assertTrue(output.contains(testTopicName))
assertFalse(output.contains(Topic.GROUP_METADATA_TOPIC_NAME))
}
}
}
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ class PartitionTest {
case Right(Some(offsetAndTimestamp)) => fail("Should have failed")
case Right(None) => fail("Should have failed")
case Left(e: OffsetNotAvailableException) => // ok
case Left(e: ApiException) => fail("Should have seen OffsetNotAvailableException, saw $e")
case Left(e: ApiException) => fail(s"Should have seen OffsetNotAvailableException, saw $e")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package kafka.coordinator.group

import kafka.admin.ConsumerGroupCommand.GroupState
import kafka.common.OffsetAndMetadata
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse}
import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}

import scala.collection.JavaConverters
import scala.collection.JavaConverters._

class AbstractCreateTopicsRequestTest extends BaseRequestTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import kafka.cluster.Partition
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
import kafka.utils.TestUtils
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
Expand All @@ -33,7 +32,7 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.utils.SystemTime
import org.easymock.EasyMock._
import org.easymock.{Capture, CaptureType, IAnswer}
import org.easymock.{Capture, CaptureType}
import org.junit.Assert._
import org.junit.Test

Expand All @@ -46,7 +45,6 @@ class ReplicaFetcherThreadTest {
private val t1p1 = new TopicPartition("topic1", 1)
private val t2p1 = new TopicPartition("topic2", 1)

private var toFail = false
private val brokerEndPoint = new BrokerEndPoint(0, "localhost", 1000)

private def offsetAndEpoch(fetchOffset: Long, leaderEpoch: Int = 1): OffsetAndEpoch = {
Expand Down