KAFKA-16254: Allow MM2 to fully disable offset sync feature#15999
KAFKA-16254: Allow MM2 to fully disable offset sync feature#15999mimaison merged 39 commits intoapache:trunkfrom
Conversation
chia7712
left a comment
There was a problem hiding this comment.
@OmniaGM I have left some comments in the discussion thread (https://lists.apache.org/thread/2scgvn66vs8c04ldxsj4sw2vlo6wd98o). It would be great if you have free time to take a look.
For another code style idea: Could we have a new static class including all "emit"-related code? That will have following benefits.
- more readable. The static class can be null (or empty) to "prove" that this function is "disabled"
- no unused objects are created. For example,
offsetProducer,delayedOffsetSyncs,pendingOffsetSyncsare unused if the emit is disable - easy to test emit behavior as all they in single static class now.
WDYT?
5ae6390 to
3038580
Compare
We kinda have |
10eb256 to
557591b
Compare
436ce41 to
4958ac1
Compare
|
just remind that today is the deadline for feature freeze. If this PR is necessary for 3.8.0, we need to cherry-pick to branch-3.8 |
|
I was hoping for it to land in 3.8.0 I'll raise a PR against 3.8 once we merge it into trunk. however It shouldn't be a blocker for 3.8 |
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR! I left a few comments.
|
@C0urante I believe I addressed all the comments now, can you please have a second look? |
C0urante
left a comment
There was a problem hiding this comment.
Thanks, this is looking really good! Had some more thoughts on the validation logic but apart from that I think we're basically there. The integration tests (minus the accidental failed creation for the MirrorCheckpointConnector) look especially nice 👍
66e0246 to
fd6e0e0
Compare
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates. I made another pass. It looks good, I just left a few small suggestions.
| List<ConfigValue> configValues = super.validate(connectorConfigs).configValues(); | ||
| MirrorCheckpointConfig.validate(connectorConfigs).forEach((config, errorMsg) -> | ||
| configValues.stream() | ||
| .filter(conf -> conf.name().equals(config)) |
There was a problem hiding this comment.
EMIT_OFFSET_SYNCS_ENABLED is in MirrorSourceConfig, so it is not a part of config def of MirrorCheckpointConnector. Hence, the error related to EMIT_OFFSET_SYNCS_ENABLED can't be propagated.
There was a problem hiding this comment.
The following test shows my concern:
@Test
public void test() {
Map<String, String> props = new HashMap<>();
props.put(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED, "false");
MirrorCheckpointConnector connector = new MirrorCheckpointConnector();
Config config = connector.validate(props);
assertEquals(1, config.configValues().stream().filter(c -> c.name().equals(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)).count());
}There was a problem hiding this comment.
I moved this to MirrorConnectorConfig which all inherit. So this should be addressed now
There was a problem hiding this comment.
I still agree with @mimaison that we shouldn't have this property in the MirrorConnectorConfig class, since then it'll be included as a "common config" for all connectors in our generated docs here instead of in the MirrorSourceConnector-specific docs here.
Can we just add a new ConfigValue for the property in MirrorCheckpointConnector::validate if we find a validation error with it?
There was a problem hiding this comment.
I updated MirrorCheckpointConnector::validate to create configValue if the name of the config not defined
There was a problem hiding this comment.
Sorry for the confusion I just lost track of @mimaison point from before! this why I reverted config back to MirrorConnectorConfig
There was a problem hiding this comment.
No worries! Latest looks good to me 👍
@chia7712 thoughts?
| boolean offsetSyncsConfigured = props.keySet().stream() | ||
| .anyMatch(conf -> conf.startsWith(OFFSET_SYNCS_CLIENT_ROLE_PREFIX) || conf.startsWith(OFFSET_SYNCS_TOPIC_CONFIG_PREFIX)); | ||
|
|
||
| if ("false".equals(props.get(MirrorConnectorConfig.EMIT_OFFSET_SYNCS_ENABLED)) && offsetSyncsConfigured) { |
There was a problem hiding this comment.
what if EMIT_OFFSET_SYNCS_ENABLED=true and offsetSyncsConfigured=false? Should we add error message for it?
There was a problem hiding this comment.
No, because offsetSyncsConfigured has default which is the global configs for topics and clients
|
|
||
| if (emitCheckpointDisabled && syncGroupOffsetsDisabled) { | ||
| Arrays.asList(SYNC_GROUP_OFFSETS_ENABLED, EMIT_CHECKPOINTS_ENABLED).forEach(configName -> { | ||
| invalidConfigs.putIfAbsent(configName, "MirrorCheckpointConnector can't run with both " + SYNC_GROUP_OFFSETS_ENABLED + " and " + |
There was a problem hiding this comment.
It seems to me EMIT_CHECKPOINTS_ENABLED does not obstruct MirrorCheckpointConnector from running since it is used to update consumer groups offsets of target cluster. By contrast, SYNC_GROUP_OFFSETS_ENABLED do impact the MirrorCheckpointConnector
There was a problem hiding this comment.
There was a problem hiding this comment.
I didn't realize that we disabled the connector from generating task configs if checkpoints were disabled. In that case, I think we should remove the check for sync.group.offsets.enabled altogether and just check emit.checkpoints.enabled. But generally I agree with @OmniaGM that if we don't generate task configs for the connector when it's configured that way, it's best to surface this as a validation error instead of silently degrading to a no-op.
There was a problem hiding this comment.
It seems to me EMIT_CHECKPOINTS_ENABLED does not obstruct MirrorCheckpointConnector from running since it is used to update consumer groups offsets of target cluster. By contrast, SYNC_GROUP_OFFSETS_ENABLED do impact the MirrorCheckpointConnector
I just notice that I use the incorrect config name :(
My understanding that it will not start a task so the connector would be doing nothing without any feedback or indication for why.
agree and my point was "we should return invalid config if EMIT_CHECKPOINTS_ENABLED=false", because SYNC_GROUP_OFFSETS_ENABLED=false is allowed to MirrorCheckpointConnector . For example: in this case: EMIT_CHECKPOINTS_ENABLED=false and SYNC_GROUP_OFFSETS_ENABLED=true we should return invalid configs, right?
There was a problem hiding this comment.
I removed the validation for sync.group.offsets.enabled and kept emit.checkpoints.enabled validation only
|
Thanks All for reviewing this PR. I reviewed the failed tests and they are unrelated. |
…5999) Reviewers: Mickael Maison <mickael.maison@gmail.com>, Chris Egerton <fearthecellos@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>
Implementation of KIP-1031 https://cwiki.apache.org/confluence/display/KAFKA/KIP-1031%3A+Control+offset+translation+in+MirrorSourceConnector
Committer Checklist (excluded from commit message)