KAFKA-13763 (1): Improve unit testing coverage and flexibility for IncrementalCooperativeAssignor#11974
Conversation
…s at the end of test cases
…or generation mismatch
…emove unnecessary expectedMemberConfigs field
…for redundant assignments during tests
…eaving the cluster
… logic for examining aggregated assignment data
…t and worker metadata
…ertConnectorAllocations, and assertTaskAllocations methods
…ary duplication of worker assignments
|
There's still room for improvement with the incremental rebalancing testing logic, but many of the remaining changes would involve modifications to the |
|
Filed #11983 as a more aggressive follow-up that touches on the |
| import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; | ||
| import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad; | ||
| import static org.hamcrest.CoreMatchers.hasItems; | ||
| import static org.hamcrest.CoreMatchers.is; |
There was a problem hiding this comment.
Could we remove hamcrest now and only use junit?
| import static org.junit.Assert.assertNotNull; | ||
| import static org.junit.Assert.assertNull; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.runners.Parameterized.Parameter; |
There was a problem hiding this comment.
Not related to changes in this PR. I see we are defining a test parameter with mode() but the class is not annotated with @RunWith(value = Parameterized.class) so protocolVersion is never initialized and always set to 0.
There was a problem hiding this comment.
Ah, good catch! I've corrected this in the test, although it's worth noting that in #11983 I'm proposing that we remove the protocolVersion parameter entirely since it's not accomplishing a whole lot here.
| private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, | ||
| long givenOffset, | ||
| int start, | ||
| int connectorNum) { |
There was a problem hiding this comment.
Should this be workerNum instead of connectorNum?
| return IntStream.range(start, connectorNum + 1) | ||
| .mapToObj(i -> new SimpleEntry<>("connector" + i, state)) | ||
| .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); | ||
| return fillMap( |
There was a problem hiding this comment.
I wonder if building this map explicitly may be more readable. What about something like:
private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, String ... workers)
Then you call it with memberConfigs(leader, offset, W0, W1). It's slightly less flexible, but since all tests only use a few workers I think this could make the code a bit simpler.
There was a problem hiding this comment.
Agree that it's more readable, and in some ways it's actually more flexible since you can now specify an arbitrary set of worker names instead of having them generated for you. Done 👍
| @Parameters | ||
| public static Iterable<?> mode() { | ||
| return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2}}); | ||
| return Arrays.asList(new Object[][] {{CONNECT_PROTOCOL_V1}, {CONNECT_PROTOCOL_V2}}); |
There was a problem hiding this comment.
This can be simplified into return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2); but since you're removing it in #11983, we can ignore it.
There was a problem hiding this comment.
Blegh, thanks. I'll simplify it anyways; rather leave trunk in a healthy place and not risk FUD from someone else copy+pasting this style.
| private void expectGeneration(boolean expectMismatch) { | ||
| when(coordinator.generationId()) | ||
| .thenReturn(assignor.previousGenerationId + 1) | ||
| .thenReturn(assignor.previousGenerationId + 1); |
There was a problem hiding this comment.
Since we return the same value, we can remove one of these thenReturn()
There was a problem hiding this comment.
Ah, good call. I figured the existing style was necessary since WorkerCoordinator::generationId would be invoked twice for every call to expectGeneration but it looks like Mockito handles this just fine with a single thenReturn.
| memberConfigs); | ||
|
|
||
| assertThat("Wrong set of workers for reassignments", | ||
| assertEquals("Wrong set of workers for reassignments", |
There was a problem hiding this comment.
What about simplifying these assertions into:
assertTrue("Wrong set of workers for reassignments", assignor.candidateWorkersForReassignment.isEmpty());
There was a problem hiding this comment.
I usually prefer assertEquals when possible since it shows the difference between actual and expected, which in this case would mean the complete set of incorrect candidate workers for reassignment.
|
Thanks @mimaison. I hope you don't mind but I've pushed one more change that simplifies how the configured set of connectors and tasks is simulated (this came in handy when designing tests for one of the scenarios described in KAFKA-13764). It should improve readability by removing the existing |
Jira
This is strictly a testing refactor. No functional changes are made; this can be easily verified by confirming that the only affected file is the
IncrementalCooperativeAssignorTest.javatest suite.These changes were initially discussed during review of #10367, which partially focused on improving readability in the unit tests for incremental rebalancing in Connect.
The goals here include:
performRebalanceandaddNewEmptyWorkers)expectedMemberConfigsandassignments) and assertions (like the redundant checks for leader and leader URL)assertAssignmentsmethod withassertWorkers,assertEmptyAssignment,assertConnectorAllocations, andassertTaskAllocations, and by adding the newassertBalancedAllocationandassertCompleteAllocationmethods)List::removeAllremoves all occurrences of any element contained in the collection passed to the method)assertNoReassignments(now renamed toassertNoRedundantAssignments) method that there should be no duplicated connectors or tasks in the assignments reported by each worker to the leader during rebalance (this is unnecessary and even contradicts logic used for testing in cases like this where we intentionally simulate a worker with a duplicated set of connectors and/or tasks rejoining a cluster; the only reason this bug wasn't surfaced sooner is because the bug mentioned in the prior point covers it)Once merged, this should allow for cleaner, faster test writing when adding new cases for incremental rebalancing, such as with #10367.
Committer Checklist (excluded from commit message)