Skip to content

KAFKA-12648: basic skeleton API for NamedTopology#10615

Merged
ableegoldman merged 12 commits intoapache:trunkfrom
ableegoldman:12648-basic-NamedTopology-API
May 1, 2021
Merged

KAFKA-12648: basic skeleton API for NamedTopology#10615
ableegoldman merged 12 commits intoapache:trunkfrom
ableegoldman:12648-basic-NamedTopology-API

Conversation

@ableegoldman
Copy link
Copy Markdown
Member

Basically just the API of #10609, should not contain any logical changes in Streams at this point (for example I also stripped out the protocol change for now, even though it's more or less done)

@ableegoldman
Copy link
Copy Markdown
Member Author

@wcarlson5 @guozhangwang @rodesai -- Walker let me know if there's anything else you need, but I think this should be sufficient to unblock work on the POC while I continue filling in the guts

public void writeTo(final DataOutputStream out, final int version) throws IOException {
out.writeInt(topicGroupId);
out.writeInt(partition);
if (version >= MIN_NAMED_TOPOLOGY_VERSION) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I did strip out the protocol change, I left this in since it doesn't affect anything until we actually bump the protocol version (which I did take out of this PR)


final Set<TaskId> expectedClientITasks = new HashSet<>(asList(TASK_0_0, TASK_0_1, TASK_1_0, TASK_0_5));
final Set<TaskId> expectedClientIITasks = new HashSet<>(asList(TASK_0_2, TASK_0_3, TASK_0_4));
final Set<TaskId> allTasks = new HashSet<>(asList(TASK_0_0, TASK_0_1, TASK_1_0, TASK_0_5, TASK_0_2, TASK_0_3, TASK_0_4));
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might seem weird that some of these tests started failing since this PR is just adding an unused API, but it's just because they're very rigid and tend to assert a specific task assignment rather than validating the assignment contract, eg that tasks are evenly balanced. I'm guessing that just adding a field to the TaskId class, even though it's unused, resulted in a different hash and therefore ordering in some unsorted data structure somewhere in the assignor.

I did verify that the new test results are still correct within the expected constraints, then adapted the tests to be more durable rather than just modifying the expected task assignments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, however you might be able not include the new field from the hash to prevent a chaotic assignment if you wanted

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but we'll want to add it eventually so I just left this part in. I wanted to keep all the changes in TaskId to avoid merge hell and that included adding it to the hash.

I see it as an opportunity to improve some of the tests that we should have done anyways

Copy link
Copy Markdown
Contributor

@wcarlson5 wcarlson5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall this makes sense!


import org.apache.kafka.streams.Topology;

public class NamedTopology {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it be possible to add an api to get the topics the NamedTopology depends on directly? It seems that is the only this we care about when it comes to assigning them to stream runtimes right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, good call. Will do


final Set<TaskId> expectedClientITasks = new HashSet<>(asList(TASK_0_0, TASK_0_1, TASK_1_0, TASK_0_5));
final Set<TaskId> expectedClientIITasks = new HashSet<>(asList(TASK_0_2, TASK_0_3, TASK_0_4));
final Set<TaskId> allTasks = new HashSet<>(asList(TASK_0_0, TASK_0_1, TASK_1_0, TASK_0_5, TASK_0_2, TASK_0_3, TASK_0_4));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, however you might be able not include the new field from the hash to prevent a chaotic assignment if you wanted

@ableegoldman ableegoldman force-pushed the 12648-basic-NamedTopology-API branch from 2680744 to f720d7b Compare May 1, 2021 00:08
@ableegoldman
Copy link
Copy Markdown
Member Author

Just some unrelated test failures in RaftClusterTest (aka, the usual these days 😞 ). Once I get sign-off from @guozhangwang it should be good to merge

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@ableegoldman ableegoldman merged commit 16b2ce7 into apache:trunk May 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants