KAFKA-13255: use exclude filter for new topics#11401
Conversation
|
Thanks for the PR. Can you add a test for this fix? |
|
@mimaison Added a test case for exclude filter. Note, even though I could add only some param with random name to exclude, I decided to keep all the values from default filter as an example how to define proper full list if a user decides to override defaults. Usually, users forget to add defaults, and that leads to the issues. Thanks for review and suggestion! |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the quick update. Unfortunately the test you added is not exercising the code change. The test should fail if the fix is removed. Can you update it?
| } | ||
|
|
||
| @Test | ||
| public void testConfigPropertyFilteringExclude() { |
There was a problem hiding this comment.
Even without the code change, this test is passing. It's because it's calling connector.targetConfig() while the fix is in createNewTopics().
There was a problem hiding this comment.
@mimaison, thanks for the comments. Makes perfect sense.
I'm adding integration test to address your suggestion.
Along with that, i'm still keeping the unitTest committed before, so it will test configuration filtering logic without integration. The method I've added to unitTest follows the suit of the very previous test testConfigPropertyFiltering, which tests default exclude list this particular way.
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the update! I'm not very keen on adding another integration test (slow and heavy) for such a small change. I suggested an alternative unit test, let me know what you think.
| assertFalse(primaryTopics.contains("mm2-offset-syncs." + BACKUP_CLUSTER_ALIAS + ".internal")); | ||
| } | ||
|
|
||
| @Test |
There was a problem hiding this comment.
Instead of an integration test, what about adding a test like this to MirrorSourceConnectorTest:
@Test
public void testNewTopicConfigs() throws Exception {
Map<String, Object> mmConfig = new HashMap<>();
mmConfig.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, "
+ "leader\\.replication\\.throttled\\.replicas, "
+ "message\\.timestamp\\.difference\\.max\\.ms, "
+ "message\\.timestamp\\.type, "
+ "unclean\\.leader\\.election\\.enable, "
+ "min\\.insync\\.replicas,"
+ "exclude_param.*");
DefaultConfigPropertyFilter filter = new DefaultConfigPropertyFilter();
filter.configure(mmConfig);
MirrorSourceConnector connector = spy(new MirrorSourceConnector(new SourceAndTarget("source", "target"),
new DefaultReplicationPolicy(), x -> true, filter));
String topic = "testtopic";
List<ConfigEntry> entries = new ArrayList<>();
entries.add(new ConfigEntry("name-1", "value-1"));
entries.add(new ConfigEntry("exclude_param.param1", "value-param1"));
entries.add(new ConfigEntry("min.insync.replicas", "2"));
Config config = new Config(entries);
doReturn(Collections.singletonMap(topic, config)).when(connector).describeTopicConfigs(any());
doAnswer(invocation -> {
Map<String, NewTopic> newTopics = invocation.getArgument(0);
assertNotNull(newTopics.get("source." + topic));
Map<String, String> targetConfig = newTopics.get("source." + topic).configs();
// property 'name-1' isn't defined in the exclude filter -> should be replicated
assertNotNull(targetConfig.get("name-1"), "should replicate properties");
// this property is in default list, just double check it:
String prop1 = "min.insync.replicas";
assertNull(targetConfig.get(prop1), "should not replicate excluded properties " + prop1);
// this property is only in exclude filter custom parameter, also tests regex on the way:
String prop2 = "exclude_param.param1";
assertNull(targetConfig.get(prop2), "should not replicate excluded properties " + prop2);
return null;
}).when(connector).createNewTopics(any());
connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L));
verify(connector).targetConfig(config);
}
What do you think?
There was a problem hiding this comment.
I think I've had to check Mockito usage before.
This makes sense, really appreciate the help. Except for last line. Not sure why we have to add hard dependency on targetConfig invocation, if assertions in doAnswer provide full test coverage. I'll submit without the verify, let me know if disagree, then I'll add verify line.
Also adding a change to the connector class itself, as it needs a visibility change for this test. And still gonna keep integration test to follow the existing approach.
Fully retested with ./gradlew connect:mirror:test
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates, it looks good. I just left a last suggestion.
| return null; | ||
| }).when(connector).createNewTopics(any()); | ||
| connector.createNewTopics(Collections.singleton(topic), Collections.singletonMap(topic, 1L)); | ||
| } |
There was a problem hiding this comment.
I agree that calling verify on targetConfig() is not the best. But I think we should call verify on createNewTopics(any()) to ensure our mocked method, and the assertions, actually run.
There was a problem hiding this comment.
agree. with a small correction of createNewTopics(any(), any()). since the targetConfig() is being called from overloaded method with two params.
There was a problem hiding this comment.
Ah right, the code we changed is actually in createNewTopics(any(), any()). As we explicitly call that method ourselves, I don't think the verify call adds a lot of value. But I think it's fine, it shouldn't hurt
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates, I left a couple of minor comments, we should be able to merge once they are addressed.
| public void testTopicConfigPropertyFilteringExclude() throws Exception { | ||
| // create exclude filter configuration and start MM2: | ||
| mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false"); | ||
| mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "follower\\.replication\\.throttled\\.replicas, " |
There was a problem hiding this comment.
To keep the test as simple as needed, what about simply using:
mm2Props.put(DefaultConfigPropertyFilter.CONFIG_PROPERTIES_EXCLUDE_CONFIG, "delete.retention.ms");
| assertEquals(primaryConfig, backupConfig, | ||
| "`retention.bytes` should be the same, because it isn't in exclude filter! "); | ||
| assertEquals(backupConfig, "1000", | ||
| "`retention.bytes` should be the same, because it's explicitly defined! "); |
There was a problem hiding this comment.
should be the same -> should be 1000
Also let's swap the arguments as the expected value should come first:
assertEquals("1000", backupConfig, ...
There was a problem hiding this comment.
updated per suggestions
|
Thanks for the updates. The test seems to be failing when run in This integration test uses IdentityReplicationPolicy that does not rename topics. At this stage I'm tempted to just drop the integration test or maybe update one of the existing test. |
|
@mimaison yeah, missed that failure. I fixed it now. Also I addressed your suggestion to avoid running new test, and updated |
…che#11401) Reviewers: Mickael Maison <mickael.maison@gmail.com>
…che#11401) Reviewers: Mickael Maison <mickael.maison@gmail.com>
…cs-11-may-2022 * apache-kafka/3.1: (51 commits) MINOR: reload4j build dependency fixes (apache#12144) KAFKA-13255: Use config.properties.exclude when mirroring topics (apache#11401) KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3) (apache#12096) KAFKA-13794: Follow up to fix producer batch comparator (apache#12006) fix: make sliding window works without grace period (#kafka-13739) (apache#11980) 3.1.1 release notes (apache#12001) KAFKA-13794; Fix comparator of `inflightBatchesBySequence` in `TransactionManager` (apache#11991) KAFKA-13782; Ensure correct partition added to txn after abort on full batch (apache#11995) KAFKA-13748: Do not include file stream connectors in Connect's CLASSPATH and plugin.path by default (apache#11908) KAFKA-13775: CVE-2020-36518 - Upgrade jackson-databind to 2.12.6.1 (apache#11962) ...
MM2 needs to honor
config.properties.excludeproperty when it replicates initial/new topics. Updates toMirrorSourceConnectorin 2.8 break exclude filter. Prior to 2.8, MM2 would create a topic in target cluster without any config applied. The TopicSync would apply the configuration, honoring exclude filter.But in 2.8, the change has been made to create a topic on target with all the configurations from the source cluster. While doing that, no filters applied at all.
This fix ensures that the same working logic filtering excluded properties is applied while creating new topics on target cluster.
Testing Strategy:
Reproduce:
bin/kafka-configs.sh --bootstrap-server <broker_host> --alter --entity-type topics --entity-name Topic_Short_Retention --add-config retention.ms=300000config.properties.exclude=follower.replication.throttled.replicas, leader.replication.throttled.replicas, message.timestamp.difference.max.ms, message.timestamp.type, unclean.leader.election.enable, min.insync.replicas, retention.ms. Note, I copied default exclude filter and addedretention.msto itretention.mscopied from source cluster.Test the Fix:
Committer Checklist (excluded from commit message)