KAFKA-18303; Update ShareCoordinator to use new record format#18396
KAFKA-18303; Update ShareCoordinator to use new record format#18396dajac merged 4 commits intoapache:trunkfrom
Conversation
| short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0; | ||
| short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0; | ||
| short SHARE_UPDATE_RECORD_KEY_VERSION = 1; | ||
| short SHARE_UPDATE_RECORD_VALUE_VERSION = 1; |
There was a problem hiding this comment.
This is a bug. The version of the update value record must be 0.
There was a problem hiding this comment.
@AndrewJSchofield Is this something that we want to fix in 4.0?
There was a problem hiding this comment.
Pardon me, why not fix it for 4.0? I assume it would be an issue to 4.1 in reading the records having incorrect version.
There was a problem hiding this comment.
We don't support migration of the share-group state topic data from 4.0 to 4.1, so not strictly necessary. But really, it is not ideal. Probably should fix it. I'll open an issue.
| // Noop | ||
| } | ||
| } catch (UnsupportedVersionException ex) { | ||
| // Ignore |
There was a problem hiding this comment.
I ignore unknown records to follow the current implementation.
| .setDeliveryState(batch.deliveryState())) | ||
| .collect(Collectors.toList())), | ||
| ShareCoordinator.SHARE_SNAPSHOT_RECORD_VALUE_VERSION) | ||
| (short) 0) |
There was a problem hiding this comment.
It is better to hardcode the version here in order to avoid using the wrong ones as it was the case.
| protected ApiMessage apiMessageKeyFor(short recordVersion) { | ||
| switch (recordVersion) { | ||
| case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: | ||
| case 0: |
There was a problem hiding this comment.
This is a temporary step backward. I will rework it in https://issues.apache.org/jira/browse/KAFKA-18308.
AndrewJSchofield
left a comment
There was a problem hiding this comment.
Thanks for the PR. Handling the records in this way is much nicer.
| handleShareUpdate((ShareUpdateKey) key.message(), (ShareUpdateValue) messageOrNull(value)); | ||
| break; | ||
| default: | ||
| // Noop |
There was a problem hiding this comment.
maybe we can use java 17 enhanced switch to eliminate unnecessary default branch.
…og-compaction-write-record-v2 * apache-github/trunk: (34 commits) MINOR: Bump year to 2025 in NOTICE file (apache#18427) KAFKA-18411 Remove ZkProducerIdManager (apache#18413) KAFKA-18408 tweak the 'tag' field for BrokerHeartbeatRequest.json, BrokerRegistrationChangeRecord.json and RegisterBrokerRecord.json (apache#18421) KAFKA-18414 Remove KRaftRegistrationResult (apache#18401) KAFKA-17921 Support SASL_PLAINTEXT protocol with java.security.auth.login.config (apache#17671) KAFKA-18384 Remove ZkAlterPartitionManager (apache#18364) KAFKA-10790: Add deadlock detection to producer#flush (apache#17946) KAFKA-18412: Remove EmbeddedZookeeper (apache#18399) MINOR : Improve Exception log in NotEnoughReplicasException(apache#12394) MINOR: Improve PlaintextAdminIntegrationTest#testConsumerGroups (apache#18409) MINOR: Remove unused local variable (apache#18410) MINOR: Remove RaftManager.maybeDeleteMetadataLogDir and AutoTopicCreationManagerTest.scala (apache#17365) KAFKA-18368 Remove TestUtils#MockZkConnect and remove zkConnect from TestUtils#createBrokerConfig (apache#18352) MINOR: Update Consumer group timeout default to 30 sec (apache#16406) MINOR: Fix typo in CommitRequestManager (apache#18407) MINOR: cleanup JavaDocs for deprecation warnings (apache#18402) KAFKA-18303; Update ShareCoordinator to use new record format (apache#18396) MINOR: Update Consumer and Producer JavaDocs for committing offsets (apache#18336) KAFKA-16446: Improve controller event duration logging (apache#15622) KAFKA-18388 test-kraft-server-start.sh should use log4j2.yaml (apache#18370) ...
…#18396) Following apache#18261, this patch updates the Share Coordinator to use the new record format. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Andrew Schofield <aschofield@confluent.io>
Following #18261, this patch updates the Share Coordinator to use the new record format.
Committer Checklist (excluded from commit message)