From 5611fd872eed98f02ea805eabaa170c9b09c81a5 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 18:11:36 +0300 Subject: [PATCH 1/7] KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates --- core/src/main/scala/kafka/cluster/Partition.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 9e8edaa5ea4e2..78c1d13dacd9f 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -678,11 +678,13 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { - remoteReplicasMap.clear() + val replicaSet = assignment.toSet + val removedReplicas = remoteReplicasMap.keys -- replicaSet + assignment .filter(_ != localBrokerId) .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - + removedReplicas.foreach(remoteReplicasMap.remove) if (addingReplicas.nonEmpty || removingReplicas.nonEmpty) assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment) else From 664b7cdf60377c54cecf8b36b3a81511132ef008 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 18:35:32 +0300 Subject: [PATCH 2/7] Avoid using SetOps -- trait method The given method is deprecated in Scala 2.13 --- core/src/main/scala/kafka/cluster/Partition.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 78c1d13dacd9f..6da782dfac5ef 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -678,13 +678,14 @@ class Partition(val topicPartition: TopicPartition, isr: Set[Int], addingReplicas: Seq[Int], removingReplicas: Seq[Int]): Unit = { - val replicaSet = assignment.toSet - val removedReplicas = remoteReplicasMap.keys -- replicaSet + val newRemoteReplicas = assignment.filter(_ != localBrokerId) + val removedReplicas = remoteReplicasMap.keys.filter(!newRemoteReplicas.contains(_)) - assignment - .filter(_ != localBrokerId) - .foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) + // due to code paths accessing remoteReplicasMap without a lock, + // first add the new replicas and then remove the old ones + newRemoteReplicas.foreach(id => remoteReplicasMap.putIfNotExists(id, new Replica(id, topicPartition))) removedReplicas.foreach(remoteReplicasMap.remove) + if (addingReplicas.nonEmpty || removingReplicas.nonEmpty) assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment) else From f6d93406d7563592f8c516b0b8c8d13c7429c290 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 20:02:49 +0300 Subject: [PATCH 3/7] Prefer Pool#getAndMaybePut to avoid needless Replica instantiations --- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6da782dfac5ef..895106a2e1b8b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -683,7 +683,7 @@ class Partition(val topicPartition: TopicPartition, // due to code paths accessing remoteReplicasMap without a lock, // first add the new replicas and then remove the old ones - newRemoteReplicas.foreach(id => remoteReplicasMap.putIfNotExists(id, new Replica(id, topicPartition))) + newRemoteReplicas.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) removedReplicas.foreach(remoteReplicasMap.remove) if (addingReplicas.nonEmpty || removingReplicas.nonEmpty) From 12016f1d9ad55482edf89f7315191f131dc1dac6 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 20:04:19 +0300 Subject: [PATCH 4/7] Add test to catch the race condition --- .../kafka/cluster/PartitionLockTest.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 403eebec30dfb..3427aa25ecaf7 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -36,6 +36,7 @@ import org.mockito.ArgumentMatchers import org.mockito.Mockito.{mock, when} import scala.jdk.CollectionConverters._ +import scala.concurrent.duration._ /** * Verifies that slow appends to log don't block request threads processing replica fetch requests. @@ -116,6 +117,56 @@ class PartitionLockTest extends Logging { future.get(15, TimeUnit.SECONDS) } + /** + * Concurrently calling updateAssignmentAndIsr should always ensure that non-lock access + * to the inner remoteReplicaMap (accessed by getReplica) cannot see an intermediate state + * where replicas present both in the old and new assignment are missing + */ + @Test + def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = { + val active = new AtomicBoolean(true) + val replicaToCheck = 3 + val firstReplicaSet = Seq[Integer](3, 4, 5).asJava + val secondReplicaSet = Seq[Integer](1, 2, 3).asJava + def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + .setControllerEpoch(1) + .setLeader(replicas.get(0)) + .setLeaderEpoch(1) + .setIsr(replicas) + .setZkVersion(1) + .setReplicas(replicas) + .setIsNew(true) + val offsetCheckpoints: OffsetCheckpoints = mock(classOf[OffsetCheckpoints]) + // Update replica set synchronously first to avoid race conditions + partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) + assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + + var i = 0 + val future = executorService.submit((() => { + // Flip assignment between two replica sets + while (active.get) { + val replicas = if (i % 2 == 0) { + firstReplicaSet + } else { + secondReplicaSet + } + + partition.makeLeader(partitionState(replicas), offsetCheckpoints) + + i += 1 + Thread.sleep(1) // just to avoid tight loop + } + }): Runnable) + + val deadline = 5.seconds.fromNow + while(deadline.hasTimeLeft()) { + assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + } + active.set(false) + future.get(5, TimeUnit.SECONDS) + assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) + } + /** * Perform concurrent appends and replica fetch requests that don't require write lock to * update follower state. Release sufficient append permits to complete all except one append. From 21fb575aa28581f5b40953726dc8a9868cc46867 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 20:18:28 +0300 Subject: [PATCH 5/7] Introduce Pool#removeAll helper method to improve readability --- .../main/scala/kafka/cluster/Partition.scala | 2 +- core/src/main/scala/kafka/utils/Pool.scala | 2 + .../scala/unit/kafka/utils/PoolTest.scala | 41 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/unit/kafka/utils/PoolTest.scala diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 895106a2e1b8b..fb0576ee5648c 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -684,7 +684,7 @@ class Partition(val topicPartition: TopicPartition, // due to code paths accessing remoteReplicasMap without a lock, // first add the new replicas and then remove the old ones newRemoteReplicas.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition))) - removedReplicas.foreach(remoteReplicasMap.remove) + remoteReplicasMap.removeAll(removedReplicas) if (addingReplicas.nonEmpty || removingReplicas.nonEmpty) assignmentState = OngoingReassignmentState(addingReplicas, removingReplicas, assignment) diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 0a1531ba4ad04..6d3a3f0969bb3 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -69,6 +69,8 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def remove(key: K, value: V): Boolean = pool.remove(key, value) + def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection) + def keys: Set[K] = pool.keySet.asScala def values: Iterable[V] = pool.values.asScala diff --git a/core/src/test/scala/unit/kafka/utils/PoolTest.scala b/core/src/test/scala/unit/kafka/utils/PoolTest.scala new file mode 100644 index 0000000000000..b4334f979ba80 --- /dev/null +++ b/core/src/test/scala/unit/kafka/utils/PoolTest.scala @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.utils + +import kafka.utils.Pool +import org.junit.Assert.assertEquals +import org.junit.Test + + +class PoolTest { + @Test + def testRemoveAll(): Unit = { + val pool = new Pool[Int, String] + pool.put(1, "1") + pool.put(2, "2") + pool.put(3, "3") + + assertEquals(3, pool.size) + + pool.removeAll(Seq(1, 2)) + assertEquals(1, pool.size) + assertEquals("3", pool.get(3)) + pool.removeAll(Seq(3)) + assertEquals(0, pool.size) + } +} From c13e73849e4bb1c322aaa099432a4b34401a8085 Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 20:22:33 +0300 Subject: [PATCH 6/7] Move PoolTest to same package as kafka.utils.Pool --- core/src/test/scala/unit/kafka/utils/PoolTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/utils/PoolTest.scala b/core/src/test/scala/unit/kafka/utils/PoolTest.scala index b4334f979ba80..74751806a11ef 100644 --- a/core/src/test/scala/unit/kafka/utils/PoolTest.scala +++ b/core/src/test/scala/unit/kafka/utils/PoolTest.scala @@ -15,9 +15,8 @@ * limitations under the License. */ -package unit.kafka.utils +package kafka.utils -import kafka.utils.Pool import org.junit.Assert.assertEquals import org.junit.Test From 0f813d69b07fb5cf8a85587576652e4a55cc7f3b Mon Sep 17 00:00:00 2001 From: Stanislav Kozlovski Date: Thu, 23 Jul 2020 21:27:03 +0300 Subject: [PATCH 7/7] Reduce test timeout and address stylistic comments --- core/src/main/scala/kafka/utils/Pool.scala | 2 +- .../test/scala/unit/kafka/cluster/PartitionLockTest.scala | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala index 6d3a3f0969bb3..d64ff5d6538ff 100644 --- a/core/src/main/scala/kafka/utils/Pool.scala +++ b/core/src/main/scala/kafka/utils/Pool.scala @@ -69,7 +69,7 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] { def remove(key: K, value: V): Boolean = pool.remove(key, value) - def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection) + def removeAll(keys: Iterable[K]): Unit = pool.keySet.removeAll(keys.asJavaCollection) def keys: Set[K] = pool.keySet.asScala diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala index 3427aa25ecaf7..8dd3b53f81fac 100644 --- a/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala +++ b/core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala @@ -128,7 +128,7 @@ class PartitionLockTest extends Logging { val replicaToCheck = 3 val firstReplicaSet = Seq[Integer](3, 4, 5).asJava val secondReplicaSet = Seq[Integer](1, 2, 3).asJava - def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState() + def partitionState(replicas: java.util.List[Integer]) = new LeaderAndIsrPartitionState() .setControllerEpoch(1) .setLeader(replicas.get(0)) .setLeaderEpoch(1) @@ -141,8 +141,8 @@ class PartitionLockTest extends Logging { partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints) assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) - var i = 0 val future = executorService.submit((() => { + var i = 0 // Flip assignment between two replica sets while (active.get) { val replicas = if (i % 2 == 0) { @@ -158,8 +158,8 @@ class PartitionLockTest extends Logging { } }): Runnable) - val deadline = 5.seconds.fromNow - while(deadline.hasTimeLeft()) { + val deadline = 1.seconds.fromNow + while (deadline.hasTimeLeft()) { assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined) } active.set(false)