Skip to content

KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates#9065

Merged
ijuma merged 7 commits intoapache:trunkfrom
stanislavkozlovski:KAFKA-10301
Jul 24, 2020
Merged

KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates#9065
ijuma merged 7 commits intoapache:trunkfrom
stanislavkozlovski:KAFKA-10301

Conversation

@stanislavkozlovski
Copy link
Copy Markdown
Contributor

We would previously update the map by adding the new replicas to the map and then removing the old ones. During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it.

While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to the previous behavior of showing the intermediate state of the map with extra replicas, rather than an intermediate state of the map with no replicas.

@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

cc @ijuma

.filter(_ != localBrokerId)
.foreach(id => remoteReplicasMap.getAndMaybePut(id, new Replica(id, topicPartition)))

removedReplicas.foreach(remoteReplicasMap.remove)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to not get fancy with refactorings - this is literally the old code (

removedReplicas.foreach(remoteReplicasMap.remove)
))

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would remoteReplicasMap --= removedReplicas work here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is a Pool, so we would have to add a removeAll method. Seems easy enough though since it can call the relevant method in ConcurrentMap.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteReplicasMap --= removedReplicas doesn't compile - the remoteReplicasMap is using a Kafka Pool class which itself is using a Java Map and I don't think they support the --= notation

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 23, 2020

ok to test

The given method is deprecated in Scala 2.13
@stanislavkozlovski
Copy link
Copy Markdown
Contributor Author

Currently working on introducing a test case for this

* where replicas present both in the old and new assignment are missing
*/
@Test
def testGetReplicaWithUpdateAssignmentAndIsr(): Unit = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails incredibly quickly 100/100 times without the Partition.scala changes.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 23, 2020

ok to test

* limitations under the License.
*/

package unit.kafka.utils
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unit.

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. A few minor comments below.


def remove(key: K, value: V): Boolean = pool.remove(key, value)

def removeAll(keys: Iterable[K]): Unit = pool.keySet().removeAll(keys.asJavaCollection)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: () is not needed.

val replicaToCheck = 3
val firstReplicaSet = Seq[Integer](3, 4, 5).asJava
val secondReplicaSet = Seq[Integer](1, 2, 3).asJava
def partitionState(replicas: java.util.List[Integer]): LeaderAndIsrPartitionState = new LeaderAndIsrPartitionState()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to repeat LeaderAndIsrPartitionState twice.

partition.makeLeader(partitionState(secondReplicaSet), offsetCheckpoints)
assertTrue(s"Expected replica $replicaToCheck to be defined", partition.getReplica(replicaToCheck).isDefined)

var i = 0
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be inside the thread state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, nice catch

}
}): Runnable)

val deadline = 5.seconds.fromNow
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 seconds is quite a bit. Can it be lower?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. I opted for 5s as I saw the other tests had up to 15s of waits for futures. Let me see if 1s can go

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lowered to 1s

}): Runnable)

val deadline = 5.seconds.fromNow
while(deadline.hasTimeLeft()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space missing after while.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

ok to test

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

retest this please

Copy link
Copy Markdown
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks. Will wait for the tests to pass.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

ok to test

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

retest this please

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

One PR build was started here:

https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/3515/

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

Looks like the last message somehow made the Jenkins status to be updated in the PR again.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 24, 2020

One build passed, one failed due to environmental reasons, one had a single flaky failure:

org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testOneWayReplicationWithAutorOffsetSync1

I ran the full test suite locally and it passed as well. Merging to trunk and 2.6. cc @rhauch

@ijuma ijuma merged commit fa6e5b8 into apache:trunk Jul 24, 2020
ijuma pushed a commit that referenced this pull request Jul 24, 2020
…n assignment updates (#9065)

We would previously update the map by adding the new replicas to the map and then removing the old ones.
During a recent refactoring, we changed the logic to first clear the map and then add all the replicas to it.

While this is done in a write lock, not all callers that access the map structure use a lock. It is safer to revert to
the previous behavior of showing the intermediate state of the map with extra replicas, rather than an
intermediate state of the map with no replicas.

Reviewers: Ismael Juma <ismael@juma.me.uk>
ijuma added a commit to ijuma/kafka that referenced this pull request Nov 17, 2020
…t-for-generated-requests

* apache-github/trunk: (148 commits)
  MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACTOR as they are duplicate to CreateTopicsRequest#NO_NUM_PARTITIONS and CreateTopicsRequest#NO_REPLICATION_FACTOR (apache#9077)
  MINOR: Remove staticmethod tag to be able to use logger of instance (apache#9086)
  MINOR: Adjust 'release.py' script to use shell when using gradlewAll and PGP signing, which were required to build the 2.6.0 RCs (apache#9045)
  MINOR: Update dependencies for Kafka 2.7 (part 1) (apache#9082)
  MINOR: INFO log4j when request re-join (apache#9068)
  MINOR: Recommend Java 11 (apache#9080)
  KAFKA-10306: GlobalThread should fail on InvalidOffsetException (apache#9075)
  KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (apache#9022)
  MINOR: code cleanup for `VOut` inconsistent naming (apache#8907)
  KAFKA-10246 : AbstractProcessorContext topic() throws NPE (apache#9034)
  KAFKA-10305: Print usage when parsing fails for ConsumerPerformance (apache#9071)
  MINOR: removed incorrect deprecation annotations (apache#9061)
  MINOR: speed up release script (apache#9070)
  MINOR: add task ':streams:testAll' (apache#9073)
  KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (apache#9065)
  KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (apache#9051)
  KAFKA-10300 fix flaky core/group_mode_transactions_test.py (apache#9059)
  MINOR: Publish metrics package in the javadoc (apache#9036)
  KAFKA-8264: decrease the record size for flaky test
  KAFKA-5876: Add new exception types for Interactive Queries (apache#8200)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants