Kafka-16540: Update partitions if min isr config is changed.#15702
Kafka-16540: Update partitions if min isr config is changed.#15702CalvinLiu7947 wants to merge 9 commits intoapache:trunkfrom
Conversation
|
@mumrah Can you help take a look? |
| void maybeTriggerMinIsrConfigUpdate(Optional<String> topicName) throws InterruptedException, ExecutionException { | ||
| appendWriteEvent("partitionUpdateForMinIsrChange", OptionalLong.empty(), | ||
| () -> replicationControl.getPartitionElrUpdatesForConfigChanges(topicName)).get(); | ||
| } |
There was a problem hiding this comment.
calling .get() on an appendWriteEvent doesn't look right to me. If I understand correctly, the appendWriteEvents are handled in the quorum controller event loop thread.
We would expect replay() to also be called in the event loop thread. so if we trigger an appendWriteEvent and block waiting for the result, it would always time out, since we are blocking the processing thread.
There was a problem hiding this comment.
Got it, we basically only need to call the appendWriteEvents and do not wait for the replay().
There was a problem hiding this comment.
hmm, I think a better way to think about it is that we want to append the min ISR config update atomically with the partition change records. Appending the partition change records once the config change is replayed is difficult to reason about and possibly incorrect. Thinking a bit more about it, triggering a write event from the replay() for the config change record means that every time we reload the metadata log, we would replay the config change record and generate new partition change records.
Perhaps one example to look at is ReplicationControlManager.handleBrokerFenced. When a broker is fenced, we generate a broker registration change record along with the leaderAndIsr partition change records. I assume we want to follow a similar model with the topic configuration change events.
There was a problem hiding this comment.
Make sense, I have some misunderstanding about the controller events. Will update. Thanks!
| if (configRecord.name().equals(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)) { | ||
| minIsrRecords.add(configRecord); | ||
| if (Type.forId(configRecord.resourceType()) == Type.TOPIC) { | ||
| if (configRecord.value() == null) topicMap.put(configRecord.resourceName(), configRecord.value()); | ||
| else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); | ||
| } | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
what is the behavior if the default broker config for min.insync.replicas is changed?
I am not actually sure how that impacts the min.insync.replicas for existing topics.
There was a problem hiding this comment.
If min.insync.replicas is not set on the topic config level, the effective min.insync.replicas of a topic will change if default broker config is updated.
There was a problem hiding this comment.
Yeah, unfortunately we have a 5 level system:
- topic configs (highest priority)
- node configs (aka "broker" configs, but they also apply to controllers)
- cluster configs (aka default configs for the broker resource)
- static configuration
- static default
The first three levels can change at runtime 🤮
| for (ConfigRecord record : minIsrRecords) { | ||
| replayInternal(record, configDataCopy, localSnapshotRegistry); | ||
| } |
There was a problem hiding this comment.
why are we calling replay here?
There was a problem hiding this comment.
This is the implementation challenge part of this PR. To find the effective min ISR value, it requires checking topic config -> dynamic broker config -> default broker config -> ...
Let's say the user updates the default broker config:
- All the topics could be affected.
- The effective min ISR values should be recalculated.
- We need to generate the partition change records along with the config change records, which means the ReplicationControlManager can't use the regular methods for the effective min ISR value. The value should be determined by the config records and the current configs.
I found it easier to make a copy of the configs and apply the min ISR updates on the copy. Then let the ReplicationControlManager check all the partitions with the config copy.
| private final TimelineHashMap<ConfigResource, TimelineHashMap<String, String>> configData; | ||
| private final Map<String, Object> staticConfig; | ||
| private final ConfigResource currentController; | ||
| private final MinIsrConfigUpdatePartitionHandler minIsrConfigUpdatePartitionHandler; |
There was a problem hiding this comment.
maybe more of a question for someone with more code ownership of the quorum controller code, but I wonder if it would be preferable to handle generating the replication control manager records in the QuorumController.incrementalAlterConfigs. That would also make it a bit easier to handle validateOnly which we are not currently handling.
| // Because it may require multiple layer look up for the min ISR config value. Build a config data copy and apply | ||
| // the config updates to it. Use this config copy for the min ISR look up. | ||
| Map<ConfigResource, TimelineHashMap<String, String>> pendingConfigData = new HashMap<>(); | ||
| SnapshotRegistry localSnapshotRegistry = new SnapshotRegistry(new LogContext("dummy-config-update")); |
There was a problem hiding this comment.
Maybe I'm missing something, but I don't think this works... pendingConfigData will have some of the new changes you made, but not all of the existing changes. So, for example, perhaps we are changing the cluster config for min topic ISR, but the node config for the current controller node is unchanged. It should take priority, but it won't be in here.
There was a problem hiding this comment.
Yeah, it is not a straightforward change. So the pending config data populated here will be checked together with the existing configs. See how OrderedConfigResolver is used below.
| String getTopicConfigWithPendingChange( | ||
| String topicName, | ||
| String configKey, | ||
| Map<ConfigResource, TimelineHashMap<String, String>> pendingConfigData |
There was a problem hiding this comment.
It seems like there could be some value in turning the pending config change into a class of its own, if we're going to be querying it like this.
There was a problem hiding this comment.
I used a OrderedConfigResolver later to solve the problem that we have to look up both pending config data and current config.
| return ApiError.NONE; | ||
| } | ||
|
|
||
| void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> records) { |
There was a problem hiding this comment.
I guess eventually we will probably want some more general system for responding to configuration changes, and not special-case min.isr. However, we don't have to do that in this PR.
I also understand why you had this function receive a list of records here rather than something fancier (easier to integrate into the existing code, and into the two configuration change paths.)
mumrah
left a comment
There was a problem hiding this comment.
Thanks for the patch, @CalvinConfluent! Left some initial feedback
| return ApiError.NONE; | ||
| } | ||
|
|
||
| void maybeTriggerPartitionUpdateOnMinIsrChange(List<ApiMessageAndVersion> records) { |
There was a problem hiding this comment.
The complexity here is a tad bit high. Can we extract a method for getting the minIsrRecords ConfigRecord from List<ApiMessageAndVersion>?
| }); | ||
|
|
||
| if (minIsrRecords.isEmpty()) return; | ||
| if (topicToMinIsrValueMap.size() == minIsrRecords.size()) { |
There was a problem hiding this comment.
The size comparison here and below are a little non-obvious (to me at least). Maybe we can set a boolean as we're looping through the records to determine if we hit this branch.
Alternative question, is this optimization helping with performance? We still need the code for the case of overlaying configs from different levels, so having this separate code path just increases complexity.
| if (configRecord.value() != null) topicToMinIsrValueMap.put(configRecord.resourceName(), configRecord.value()); | ||
| else configRemovedTopicMap.put(configRecord.resourceName(), configRecord.value()); |
There was a problem hiding this comment.
style nit: Can you reformat these to not be inline?
| } | ||
|
|
||
| ArrayList<String> topicList = new ArrayList<>(); | ||
| // If all the updates are on the Topic level, we can avoid perform a full scan of the partitions. |
There was a problem hiding this comment.
Similar to the above comment, this size check is hard to grok. Maybe we can compute a boolean to determine if we need to inspect every partition vs just a subset.
In what case do we need to scan all partitions? Only when the cluster-level min.insync.replicas is changed?
| * @param record The ConfigRecord. | ||
| * @param localConfigData The config data is going to be updated. | ||
| */ | ||
| public void replayForPendingConfig( |
There was a problem hiding this comment.
Can we reuse ConfigurationsImage here instead of adding another place where we are applying records? I think it should be reasonably straightforward to construct a ConfigurationsImage with the in-memory state (localConfigData) and then replay records to get a ConfigurationsDelta.
| configs = new ArrayList<>(); | ||
| configs.add(map); | ||
| } | ||
| public boolean containsKey(String key) { |
There was a problem hiding this comment.
style nit: whitespace before method signature
There was a problem hiding this comment.
docs: can you document this method's behavior here?
| } | ||
| public boolean containsKey(String key) { | ||
| for (Map<String, ?> config : configs) { | ||
| if (config.containsKey(key)) return config.get(key) != null; |
There was a problem hiding this comment.
If a later config map has a non-null, I think we could mistakenly return false here.
| Map<String, String> pendingControllerConfig = | ||
| pendingConfigData.containsKey(currentController) ? pendingConfigData.get(currentController) : Collections.emptyMap(); | ||
| return configSchema.resolveEffectiveTopicConfigs( | ||
| new OrderedConfigResolver(staticConfig), |
There was a problem hiding this comment.
Hm.. what happens if the controller has different static configs from the broker?
| int currentMinIsr = defaultMinIsr; | ||
| String minIsrConfig = configurationControl.getTopicConfig(topicName, MIN_IN_SYNC_REPLICAS_CONFIG); | ||
| String minIsrConfig; | ||
| if (getTopicMinIsrConfig == null) { |
There was a problem hiding this comment.
Can we split the method into two signatures instead of having the null?
E.g., int getTopicEffectiveMinIsr(String topicName) and int getTopicEffectiveMinIsr(String topicName, Function<String, String> getTopicMinIsrConfig)
| return new BrokersToIsrs.PartitionsOnReplicaIterator(topicMap, false); | ||
| } | ||
|
|
||
| BrokersToIsrs.PartitionsOnReplicaIterator partitionsWithElr() { |
There was a problem hiding this comment.
This isn't acceptable from a performance point of view, since it will be O(num_partitions)
|
Close this PR for #18148 because there are too many things since this PR was opened. |
https://issues.apache.org/jira/browse/KAFKA-16540
If the min isr config is changed, we need to update the partitions with ELR if possible.