KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path#4630
KAFKA-6054: Code cleanup to prepare the actual fix for an upgrade path#4630mjsax merged 3 commits intoapache:trunkfrom
Conversation
|
@bbejeck @vvcephei @guozhangwang @dguy This is almost a pure code cleanup PR -- required to get clean code for introducing next metadata version. One change: we should not auto-upgrade the received subscription info (from 1 to 2), otherwise the leader does not know what it received and what encoding to use for the assignment to send back (must send back version 1 encoding if at least one version 1 subscription was received). |
|
one meta-comment might be worth commenting in Otherwise LGTM. |
guozhangwang
left a comment
There was a problem hiding this comment.
Left some comments.
| encodeVersionOneData(out); | ||
| } | ||
|
|
||
| private void encodeVersionOneData(final DataOutputStream out) throws IOException { |
There was a problem hiding this comment.
I'd suggest we change to encodeAssignedTasksData and encodeHostPartitionsData correspondingly. In the future versions we may remove some of the old encoded data so the names could be misleading.
| throw new IllegalStateException("Unknown metadata version: " + usedVersion | ||
| + "; latest supported version: " + SubscriptionInfo.LATEST_SUPPORTED_VERSION); | ||
| } | ||
| if (info.version() < minUserMetadataVersion) { |
There was a problem hiding this comment.
nit: use usedVersion, and the same in the next line.
| switch (usedVersion) { | ||
| case 1: | ||
| processVersionOneAssignment(info, partitions, activeTasks); | ||
| partitionsByHost = new HashMap<>(); |
There was a problem hiding this comment.
nit: Collections.emptyMap.
| buf = encodeVersionOne(); | ||
| break; | ||
| case 2: | ||
| byte[] endPointBytes = null; |
There was a problem hiding this comment.
Merge these two lines.
| assertEquals(2, decoded.version); // automatically upgraded to v2 on decode; | ||
| assertEquals(oldVersion.activeTasks(), decoded.activeTasks()); | ||
| assertEquals(oldVersion.standbyTasks(), decoded.standbyTasks()); | ||
| assertNull(decoded.partitionsByHost()); // should be empty as wasn't in V1 |
There was a problem hiding this comment.
nit: update the comment in this line.
| */ | ||
| @Override | ||
| public Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions) { | ||
| public Map<String, Assignment> assign(final Cluster metadata, |
There was a problem hiding this comment.
Could we add a unit test with two subscriptions, v1 and v2, and then check that the returned assignment map contains v1?
|
@mjsax Just clarifying that with this patch, KAFKA-6054 is still not resolved right? |
|
Correct. This is not a fix for KAFKA-6054 -- I plan to piggyback a fix with the RocksDBs upgrade story -- we need to add a config to fix it and thus need a KIP. |
|
Updated this. @bbejeck I followed Guozhang's suggestion to rename the encoding-methods -- this should address your comment, too? |
| partitionAssignor.configure(config); | ||
| } | ||
|
|
||
| public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() { |
|
LGTM! Left a minor comment on the PR, please feel free to merge after addressing it |
|
Updated. Will wait for build to pass. |
Small change in decoding version 1 metadata: don't upgrade to version 2 automatically