Skip to content

KAFKA-10842; Use InterBrokerSendThread for raft's outbound network channel#9732

Merged
hachikuji merged 10 commits intoapache:trunkfrom
hachikuji:refactor-raft-network-io
Dec 22, 2020
Merged

KAFKA-10842; Use InterBrokerSendThread for raft's outbound network channel#9732
hachikuji merged 10 commits intoapache:trunkfrom
hachikuji:refactor-raft-network-io

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

This patch contains the following improvements:

  • Separate inbound/outbound request flows so that we can open the door for concurrent inbound request handling
  • Rewrite KafkaNetworkChannel to use InterBrokerSendThread which fixes a number of bugs/shortcomings
  • Get rid of a lot of boilerplate conversions in KafkaNetworkChannel
  • Improve validation of inbound responses in KafkaRaftClient by checking correlationId. This fixes a bug which could cause an out of order Fetch to be applied incorrectly.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member

@ijuma ijuma Dec 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are many classes where direct access to data is not actually needed. How do you feel about having a public method in RequestUtils that exposes data for the raft layer instead? Something like:

public static ApiMessage requestData(AbstractRequest req)
public static ApiMessage responseData(AbstractResponse resp)

Or is it not worth it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not sure it's worth it, though I don't feel strongly. I think ultimately we're going to start relying more on the generated classes to avoid unnecessary conversions. We're now entering "phase 2" of the request overhaul which means we can start figuring out how to remove the AbstractRequest/AbstractResponse layer. I think it will take more smarts in the generated classes to make a dent here, but if we are agreed on the goal (?), then I do not think preserving the encapsulation here is worthwhile.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand how we would handle versioning well if we only have data classes. Do you have thoughts on that?

Copy link
Copy Markdown
Contributor Author

@hachikuji hachikuji Dec 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support for optional fields would go a long way I think. I am not sure it will be possible to remove all intermediate representations, but perhaps they can be the exception and not the rule. Some version checks in KafkaApis are probably inevitable.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see this:

  1. AbstractRequest/AbstractResponse methods become part of the ApiMessage hierarchy.
  2. FooRequest/FooResponse extends FooDataRequest/FooDataResponse (like Colin suggested before)

But I don't think you want to eliminate FooRequest/FooResponse in the example above. You don't need to perform conversions for the inner classes, but it's a place where you can normalize the representation. We do that for many of the existing request/response classes.

@hachikuji hachikuji force-pushed the refactor-raft-network-io branch 2 times, most recently from 0a83589 to b5e476b Compare December 11, 2020 19:41
Comment thread core/src/main/scala/kafka/common/InterBrokerSendThread.scala Outdated
Comment thread core/src/main/scala/kafka/common/InterBrokerSendThread.scala Outdated
Comment thread core/src/main/scala/kafka/common/InterBrokerSendThread.scala Outdated
Comment thread core/src/main/scala/kafka/common/InterBrokerSendThread.scala Outdated
Comment thread core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala Outdated
Comment thread core/src/main/scala/kafka/server/BrokerToControllerChannelManagerImpl.scala Outdated
Comment thread core/src/main/scala/kafka/common/InterBrokerSendThread.scala Outdated
Comment thread core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/RaftMessageQueue.java Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/internals/BlockingMessageQueue.java Outdated
@abbccdda
Copy link
Copy Markdown

Just to reply #9732 (comment) here, generateRequests() is part of the InterBrokerSender thread which I didn't write up. In the meantime, I think changing it to one element at a time makes sense for AlterIsr and forwarding cases for now, if we believe that would make the interface easier to be used, let's refactor it.

@hachikuji
Copy link
Copy Markdown
Contributor Author

@abbccdda To clarify, what we are asking is why the implementation of generateRequests in BrokerToControllerChannelManager builds a queue for a single item.

Comment thread core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a size or isEmpty to UnsentRequests?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (related to style question elsewhere) if we want to change the style of these class definitions, can we do it as a separate PR? I always find it difficult when style changes are conflated with logical changes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I actually held myself back. I tried to only touch the cases that I was modifying anyway, but let me know if there are others. This one was especially obnoxious because of the long parameter list to InterBrokerSendThread.

Comment thread core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala Outdated
Comment thread raft/src/main/java/org/apache/kafka/raft/RaftMessageQueue.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@hachikuji hachikuji force-pushed the refactor-raft-network-io branch from ebb40be to 74cbbf8 Compare December 15, 2020 22:27
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we're not sharing these channel managers between anything. In that case, moving them into the classes that need them seems fine.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to use Iterator here instead of queue methods (i.e., peek and remove). Is it to ensure a consistent view of the queue while we're going through it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just thought it was a little simpler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants