KAFKA-6145: Pt 2.5 Compute overall task lag per client#8252
KAFKA-6145: Pt 2.5 Compute overall task lag per client#8252vvcephei merged 7 commits intoapache:trunkfrom
Conversation
d180129 to
9fb5fb6
Compare
c46b133 to
d83f64b
Compare
|
test this please |
118c3a9 to
c89ac5c
Compare
|
test this please |
|
ok to test |
|
test this please |
bd0884c to
0bada7e
Compare
fetch end offsets add lag to each clienttate build ranks add ClientState tests add tests for building client rankings fall back to prev assignment if fetching offsets fails
0bada7e to
4200590
Compare
4200590 to
3cf72e0
Compare
|
test this please |
1 similar comment
|
test this please |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, @ableegoldman ! Looks good overall, just a few remarks.
| static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsetsWithoutTimeout(final Collection<TopicPartition> partitions, | ||
| final Admin adminClient) { | ||
| return fetchEndOffsets(partitions, adminClient, null); | ||
| } | ||
|
|
||
| public static Map<TopicPartition, ListOffsetsResultInfo> fetchEndOffsets(final Collection<TopicPartition> partitions, | ||
| final Admin adminClient, | ||
| final Duration timeout) { | ||
| final Map<TopicPartition, ListOffsetsResultInfo> endOffsets; | ||
| try { | ||
| final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> future = adminClient.listOffsets( | ||
| partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> OffsetSpec.latest()))) | ||
| .all(); | ||
| if (timeout == null) { | ||
| endOffsets = future.get(); | ||
| } else { | ||
| endOffsets = future.get(timeout.toMillis(), TimeUnit.MILLISECONDS); | ||
| } | ||
| } catch (final TimeoutException | RuntimeException | InterruptedException | ExecutionException e) { | ||
| throw new StreamsException("Unable to obtain end offsets from kafka", e); | ||
| } | ||
| return endOffsets; | ||
| } |
There was a problem hiding this comment.
The fact that this is static and has nothing in particular to do with the KafkaStreams class indicates that it probably belongs in a util class for use by KafkaStreams and StreamsPartitionAssignor.
| } | ||
|
|
||
| @Test | ||
| public void shouldViolateBalanceToPreserveActiveTaskStickiness() { |
There was a problem hiding this comment.
Does this test pass on the current code base, or is it a consequence of this PR?
There was a problem hiding this comment.
It's testing the newly added #preservePreviousTaskAssignment: this is sort of a temporary hack to force the StickyTaskAssignor to return a "completely sticky" active task assignment, in the case the admin listOffsets request fails or times out.
The default StickyTaskAssignor behavior is unchanged in this PR, to leave the code in a stable state and avoid adding a huge refactoring of StickyTaskAssignorTest to an already large PR.
There was a problem hiding this comment.
Ok, thanks. The subtlety was a bit lost on me.
|
test this please |
1 similar comment
|
test this please |
|
Test failures unrelated: |
Once we have encoded the offset sums per task for each client, we can compute the overall lag during
assignby fetching the end offsets for all changelog and subtracting.If the
listOffsetsrequest fails, we simply return a "completely sticky" assignment, ie all active tasks are given to previous owners regardless of balance.Builds (but does not yet use) the
statefulTasksToRankedCandidatesmap with the ranking:Rank -1: active running task
Rank 0: standby or restoring task whose overall lag is within
acceptableRecoveryLagRank 1: tasks whose lag is unknown (eg during version probing)
Rank 1+: all other tasks are ranked according to their actual total lag