KAFKA-8237; Untangle TopicDeleteManager and add test cases#6588
KAFKA-8237; Untangle TopicDeleteManager and add test cases#6588hachikuji merged 10 commits intoapache:trunkfrom
Conversation
jsancio
left a comment
There was a problem hiding this comment.
Thanks for the PR @hachikuji. This looks great and it is for sure an improvement.
Feel free to ignore my renaming suggestions.
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package kafka.controller |
There was a problem hiding this comment.
Very cool. I was going to do something similar. I need this for my changes as I am planning to change the signature to some of these methods. I think I'll wait until we merge this PR.
|
|
||
| def leaderForReassign(partition: TopicPartition, | ||
| leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, | ||
| controllerContext: ControllerContext): ElectionResult = { |
|
|
||
| def leaderForPreferredReplica(partition: TopicPartition, | ||
| leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, | ||
| controllerContext: ControllerContext): ElectionResult = { |
|
|
||
| private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None) | ||
| val controllerContext = new ControllerContext | ||
| var controllerChannelManager: ControllerChannelManager = _ |
| def leaderForOffline(partition: TopicPartition, | ||
| leaderIsrAndControllerEpochOpt: Option[LeaderIsrAndControllerEpoch], | ||
| uncleanLeaderElectionEnabled: Boolean, | ||
| controllerContext: ControllerContext): ElectionResult = { |
There was a problem hiding this comment.
Yeah, I am not sure here either. I think in some cases we have unnecessary "batching" APIs or method signatures given that we batch things in the ControllerChannelManager. Maybe we can work toward that code structure in the controller. Where internally we deal with one partition, replica, topic, broker, etc at a time and perform batching at the edges (IO inputs and outputs).
|
retest this please |
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Thanks for the cleanup patch. Great improvement. A few comments below.
| brokerIds.filter(_ >= 0).foreach { brokerId => | ||
| def callback(stopReplicaResponse: AbstractResponse): Unit = { | ||
| if (deletePartition) | ||
| controller.eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponse, brokerId)) |
There was a problem hiding this comment.
We want to be a bit careful here. Partition reassignment will send StopReplicaRequest with deletePartition=true too. In that case, we don't want to trigger the topic deletion logic.
There was a problem hiding this comment.
I added a check for whether the topic is undergoing deletion. Let me know if that seems sufficient. It would be nice to avoid the awkward callback through the ReplicaStateMachine if possible.
| assertEquals(Set("foo"), controllerContext.topicsWithDeletionStarted) | ||
| assertEquals(Set("foo"), controllerContext.topicsIneligibleForDeletion) | ||
| assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionSuccessful)) | ||
| assertEquals(offlineReplicas, controllerContext.replicasInState("foo", OfflineReplica)) |
There was a problem hiding this comment.
This is an existing issue. At this stage, it seems that offlineReplicas should remain in ReplicaDeletionIneligible.
There was a problem hiding this comment.
Note I created https://issues.apache.org/jira/browse/KAFKA-8283 for this and the other issue below.
| assertEquals(offlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionIneligible)) | ||
| assertEquals(onlineReplicas, controllerContext.replicasInState("foo", ReplicaDeletionStarted)) | ||
|
|
||
| // Broker 2 is restarted. The offline replicas remain ineligable |
There was a problem hiding this comment.
Yes, this does seem to be an existing issue. It seems that deletionManager.resumeDeletionForTopics should bring offlineReplicas to ReplicaDeletionStarted.
There was a problem hiding this comment.
Ack. I will file a JIRA and we can address this separately.
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Thanks for the updated PR. LGTM. Just one more comment below.
| def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topicPartition: TopicPartition, deletePartition: Boolean): Unit = { | ||
| brokerIds.filter(_ >= 0).foreach { brokerId => | ||
| def callback(stopReplicaResponse: AbstractResponse): Unit = { | ||
| if (deletePartition && controllerContext.isTopicQueuedUpForDeletion(topicPartition.topic)) |
There was a problem hiding this comment.
When there is a pending partition reassignment task, a deleted topic will be queued up but not executed until the reassignment completes. So, we probably want to further check that isTopicIneligibleForDeletion(topic) is false.
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Thanks for the new patch. A couple of more comments below.
| brokerIds.filter(_ >= 0).foreach { brokerId => | ||
| def callback(stopReplicaResponse: AbstractResponse): Unit = { | ||
| if (deletePartition) | ||
| controller.eventManager.put(controller.StopAndDeleteReplicaResponseReceived(stopReplicaResponse, brokerId)) |
There was a problem hiding this comment.
This is actually trickier than I thought. In sendRequestsToBrokers(), we only group replicas with no deletion and no callback in a single StopReplicaRequest. By always including a callback, it disables batching in StopReplicaRequest.
|
|
||
| // Here we are only handling partitions which are part of topic deletion | ||
| val responseMap = stopReplicaResponse.responses.asScala | ||
| .filterKeys(tp => controllerContext.isTopicQueuedUpForDeletion(tp.topic)) |
There was a problem hiding this comment.
We probably also want to check that the topic is eligible for deletion.
There was a problem hiding this comment.
I have a little trouble following the state machine. I think we will transition to ReplicaDeletionIneligible if either we get a StopReplica response with an error or if the replica fails during deletion. I think it would be possible for a pending StopReplica response to be received after the broker had been marked offline, so I think the suggestion makes sense. Just to be clear though, this seems to be an existing problem?
There was a problem hiding this comment.
I was concerned about the case when a topic is queued for deletion behind a pending partition reassignment. In that case, the pending reassignment could send StopReplicaRequest with deletePartition=true. In the current logic, there is no callback in that case. So, the replicas won't be transitioned to ReplicaDeletionIneligible or ReplicaDeletionSuccessful until the topic is eligible for deletion. I was just trying to preserve that logic now that we add a callback as long as deletePartition=true.
There was a problem hiding this comment.
Got it. That makes sense.
There was a problem hiding this comment.
@junrao Unfortunately, we have to skip the topic ineligible check in order to continue to completeReplicaDeletion while another replica is offline. Some test cases are failing because of this. I think we should be installing a callback anyway on reassignment so that we can retry, but we can leave this for a future patch. For now, I will see if I can find a nice way to preserve existing logic.
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Thanks for the latest PR. LGTM
|
I noticed a discussion about callbacks and batching. See #6515 |
|
@ijuma Thanks for the link. I think this patch achieves the same effect. I solve it by removing the callback parameter and checking the deletion flag directly. |
| val currentState = controllerContext.replicaState(replica) | ||
| logSuccessfulTransition(replicaId, replica.topicPartition, currentState, ReplicaDeletionStarted) | ||
| controllerContext.putReplicaState(replica, ReplicaDeletionStarted) | ||
| val topicDeletionInProgress = controllerContext.isTopicQueuedUpForDeletion(replica.topicPartition.topic) |
There was a problem hiding this comment.
I think we need to check if a topic is eligible for deletion. A topic can be queued up for deletion but is blocked behind a pending partition reassignment, which will also transition old replicas to ReplicaDeletionStarted at some point.
There was a problem hiding this comment.
The problem seems to be that we are using the topicsIneligibleForDeletion set for two unrelated purposes. We use it to track topics which are awaiting reassignment before deletion and we also use it to indicate topics which have had some replica deletion failure. The problem is that we may transition a replica to ReplicaDeletionStarted in both cases. It's becoming tempting to add back that nasty callback, but let me try one more time to salvage this.
junrao
left a comment
There was a problem hiding this comment.
@hachikuji : Thanks for the latest update. LGTM
* ak/trunk: (42 commits) KAFKA-8134: `linger.ms` must be a long KAFKA-7779; Avoid unnecessary loop iteration in leastLoadedNode (apache#6081) MINOR: Update Gradle to 5.4.1 and update its plugins (apache#6436) MINOR: improve Session expiration notice (apache#6618) KAFKA-8029: In memory session store (apache#6525) MINOR: In-memory stores cleanup (apache#6595) KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (apache#6177) KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (apache#6602) KAFKA-7903: automatically generate OffsetCommitRequest (apache#6583) KAFKA-8291 : System test fix (apache#6637) MINOR: Do not log retriable offset commit exceptions as errors (apache#5904) MINOR: Fix log message error of loadTransactionMetadata (apache#6571) MINOR: Fix 404 security features links (apache#6634) MINOR: Remove an unnecessary character from broker's startup log MINOR: Make LogCleaner.shouldRetainRecord more readable (apache#6590) MINOR: Remove implicit return statement (apache#6629) KAFKA-8237; Untangle TopicDeleteManager and add test cases (apache#6588) KAFKA-8227 DOCS Fixed missing links duality of streams tables (apache#6625) MINOR: reformat settings.gradle to be more readable (apache#6621) MINOR: Correct RestServerTest formatting ... Conflicts: build.gradle settings.gradle
The controller maintains state across `ControllerContext`, `PartitionStateMachine`, `ReplicaStateMachine`, and `TopicDeletionManager`. None of this state is actually isolated from the rest. For example, topics undergoing deletion are intertwined with the partition and replica states. As a consequence of this, each of these components tends to be dependent on all the rest, which makes testing and reasoning about the system difficult. This is a first step toward untangling all the state. This patch moves it all into `ControllerContext` and removes many of the circular dependencies. So far, this is mostly a direct translation, but in the future we can add additional validation in `ControllerContext` to make sure that state is maintained consistently. Additionally, this patch adds several mock objects to enable easier testing: `MockReplicaStateMachine` and `MockPartitionStateMachine`. These have simplified logic for updating the current state. This is used to create some new test cases for `TopicDeletionManager`. Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>, Jun Rao <junrao@gmail.com>
The controller maintains a bunch of state across
ControllerContext,PartitionStateMachine,ReplicaStateMachine, andTopicDeletionManager. None of this state is actually isolated from the rest. For example, topics undergoing deletion are intertwined with the partition and replica states. As a consequence of this, each of these components tends to be dependent on all the rest, which makes testing and reasoning about the system difficult. This is a first step toward untangling all the state. I have simply moved it all intoControllerContextand removed many of the circular dependencies. So far, this is mostly a direct translation, but in the future we can add additional validation inControllerContextto make sure that state is maintained consistently.Additionally, I have created several mock objects to enable easier testing:
MockReplicaStateMachineandMockPartitionStateMachine. These have simplified logic for updating the current state. This is used to create some new test cases forTopicDeletionManager. I found that I had to change the valid previous state ofReplicaDeletionIneligibleto includeOfflineReplicato get the new tests to pass, but please make sure there is not a bug in the test case.Committer Checklist (excluded from commit message)