Skip to content

KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization#16129

Merged
ableegoldman merged 6 commits intoapache:trunkfrom
apourchet:KIP-924-15
May 30, 2024
Merged

KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization#16129
ableegoldman merged 6 commits intoapache:trunkfrom
apourchet:KIP-924-15

Conversation

@apourchet
Copy link
Copy Markdown
Contributor

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

apourchet added 2 commits May 29, 2024 17:18
This fills in the implementation details of the standby task assignment
utility functions within TaskAssignmentUtils.
@apourchet apourchet changed the title KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation [WIP] KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation May 29, 2024
Comment on lines +308 to +309
final String rack1 = clientRacks.get(clientState1.processId().id()).get();
final String rack2 = clientRacks.get(clientState2.processId().id()).get();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future cleanup: we can actually get rid of this clientRacks data structure altogether, and just use the rack ids we already processed and nicely organized in the KafkaStreamsState which is addressable by UUID, ie

Suggested change
final String rack1 = clientRacks.get(clientState1.processId().id()).get();
final String rack2 = clientRacks.get(clientState2.processId().id()).get();
final String rack1 = kafkaStreamsStates.get(clientState1.processId().id()).rackId().get();
final String rack2 = kafkaStreamsStates.get(clientState2.processId().id()).rackId().get();

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more things that need to be addressed but can be done in a followup PR

Function.identity(),
taskId -> {
final Set<String> stateStoreNames = topologyMetadata
.stateStoreNameToSourceTopicsForTopology(taskId.topologyName())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah somehow I missed this before -- this is actually returning all the state stores for this topology, it's not specific to the taskId. This was an existing issue so we don't need to fix it in this PR, it can be addressed in a followup. It might be a bit complicated so I'll take a look at how we can get this info

Would've caught this during testing since we definitely want tests with mixed stateless-and-stateful tasks, but still good to fix ASAP

@ableegoldman
Copy link
Copy Markdown
Member

Test failures are unrelated. Merging to trunk

@ableegoldman ableegoldman changed the title KAFKA-15045: (KIP-924 pt. 15) More TaskAssigmentUtils implementation KAFKA-15045: (KIP-924 pt. 15) Implement #defaultStandbyTaskAssignment and finish rack-aware standby optimization May 30, 2024
@ableegoldman ableegoldman merged commit 370e5ea into apache:trunk May 30, 2024
chiacyu pushed a commit to chiacyu/kafka that referenced this pull request Jun 1, 2024
… and finish rack-aware standby optimization (apache#16129)

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
wernerdv pushed a commit to wernerdv/kafka that referenced this pull request Jun 3, 2024
… and finish rack-aware standby optimization (apache#16129)

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
TaiJuWu pushed a commit to TaiJuWu/kafka that referenced this pull request Jun 8, 2024
… and finish rack-aware standby optimization (apache#16129)

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
… and finish rack-aware standby optimization (apache#16129)

This fills in the implementation details of the standby task assignment utility functions within TaskAssignmentUtils.

Reviewers: Anna Sophie Blee-Goldman <ableegoldman@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants