Skip to content

PROPOSAL: support async event execution in group coordinator#14705

Closed
artemlivshits wants to merge 1 commit intoapache:trunkfrom
artemlivshits:gc-async-event
Closed

PROPOSAL: support async event execution in group coordinator#14705
artemlivshits wants to merge 1 commit intoapache:trunkfrom
artemlivshits:gc-async-event

Conversation

@artemlivshits
Copy link
Copy Markdown
Contributor

This change fixes a broken abstraction where event execution relies on specific implementation detail of the ReplicaManager.appendRecords that with some arguments it is completed synchronously even though the interface is clearly asynchronous. This assumption can be broken by changing implementation, as shown by KIP-890 work that added transaction verification stage that may result in asynchronous completion (which should be perfectly fine because the function interface is asynchronous and must be used as such) and violate the assumption of event execution.

Now the event execution supports asynchronous completion and can properly handle asynchronous completion of the underlying functionality.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

This change fixes a broken abstraction where event execution relies on
specific implementation detail of the ReplicaManager.appendRecords that
with some arguments it is completed synchronously even though the
interface is cleary asynchronous.  This assumption can be broken by
changing implementation, as shown by KIP-890 work that added transaction
verification stage that may result in asynchronous completion (which
should be perfectly fine because the function interface is asynchronous
and must be used as such) and violate the assumption of event execution.

Now the event execution supports asynchronous completion and can
properly handle asynchnornous completion of the underlying funcionality.
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artemlivshits : Thanks for the PR. Left a comment.

@Override
public void run() {
try {
runAsync().get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, while this guarantees ordering, it disables pipelining and thus potentially reduces the throughput, since we have to wait for each event's records to be fully replicated before processing the next event.

We probably could introduce a different callback in ReplicaManager.appendRecords that's invoked when the records are appended to the local log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't need to wait for replication, so the current pipelining works without changes -- the current logic uses acks=1 and captures the offset and then waits for HWM to be advanced to complete the write request. It may prevent potential pipelining opportunities if new async stages are added for acks=1 (e.g. transaction verification). But the most important thing is that with this proposal, innovating under appendRecords interface would just work out of box, which is the purpose of having interfaces -- innovating under the interface doesn't break callers that use interface correctly (which makes system modular).

If we find out that we want the pipelining for transaction verification we can make this optimization later (if we find it to be a problem). We will have a choice between complexity and potentially better pipelining; with the current model, we don't have the choice -- the workflow will break if we add an async state to acks=1 processing and will have to fix it before shipping.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, Artem. Yes, it's true that the new group coordinator only depends on acks=1.

Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group. In the proposed approach, if one client's log append is blocked for additional async check, it blocks the processing of other clients and other groups. So, it still seems to reduce the overall throughput somewhat.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if one client's log append is blocked for additional async check

That is correct, it may become a perf problem, we can measure and see if it's worth fixing in practice, we'll have this choice (as well as the choice to postpone the fix, if we have time pressure to release). But it won't be a functional problem. Right now it is a functional problem, which is suboptimal in many ways:

  • appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes)
  • we cannot release new transaction protocol (or new coordinator) without implementing new logic, which makes hard dependencies and pushes against timelines (now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area)
  • KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848
  • 2 fairly complex components are now tied together -- we cannot just innovate on transaction protocol implementation details (or to be broader -- on the whole IO subsystem implementation details -- e.g. Async IO) without understanding group coordinator implementation detail and we cannot innovate on group coordinator implementation detail without understanding implementation details of transaction protocol
  • to make the previous point worse, the dependency is not visible at the "point of use" -- someone tasked with improving transaction protocol (or IO in general) would have no indication from the appendRecords interface, that adding an async stage would need to have a corresponding change in group coordinator
  • the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs

IMO, the fact that transaction verification implementation just doesn't work out-of-box with the new group coordinator (and in fact requires quite non-trivial follow-up work that will block the release) is an architectural issue. We should strive to make the system more decoupled, so that the context an engineer needs to understand to make local changes in a part of system is less.

Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group.

I don't think it's bound to a thread, but indeed the concurrency is limited to partition -- we don't let operations on the same partition run concurrently, so all the groups that are mapped to the same partition are contending. This is, however, a specific implementation choice, it should be possible to make a group to be a unit of concurrency, and if that's not enough, we can let offset commits for different partitions go concurrently as well (they just need to make sure that group doesn't change, which is sort of a "read lock" on the group), at which point there probably wouldn't be any contention in the common path.

Now, one might ask a question, implementing per-group synchronization adds complexity and handling transaction verification as an explicit state transition in group coordinator adds complexity, what the difference? I'd say the difference is fundamental -- per-group synchronization complexity is encapsulated in one component and keeps the system decoupled: an engineer tasked to improve transaction protocol, doesn't need understand implementation details of group coordinator and vice versa. Changes are smaller, can be made faster, and less bug prone. Win-win-win.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. Here is my take:

That is correct, it may become a perf problem

I strongly disagree on blocking the event loop. It will not become a perf problem. It is one. It is also an anti-pattern.

Right now it is a functional problem

It is technically not a functional problem, at least not yet, because I haven't not implemented the transactional offset commit in the new coordinator. ;)

appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes)

I will change this to not use appendRecords, this will make the contract clear.

now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area

This is incorrect. We knew about this and we always had an implementation in mind which works. I will basically decouple the write in two stages: 1) validate/prepare the transaction; and 2) update state and write. As we discussed in the other PR, this is also required for the old coordinator to work correctly.

KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848

I don't agree with this. As we just saw, we already failed to make it work correctly for the existing coordinator so the dependency was already there. Again, we can do better, I agree.

the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs

This is completely unrelated in my opinion as this is true for both the old and the new coordinator.

Overall, I agree that we could do better but I think that it is not the right time to change this. We are already under high time pressure and actually changing this in the right way puts even more pressure. We should look for a proper solution afterwards.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely unrelated in my opinion as this is true for both the old and the new coordinator.

It's true that it's a problem with the old coordinator, and we should make the whatever minimal fixes required for the old coordinator to work (and if it happens to work end-to-end, which I think it might, we won't need to fix it), but that code is going away and shouldn't define the forward-looking architecture.

As we build the new coordinator, we should build it in a way that improves forward-looking architecture. Keeping the right abstraction is good, coincidentally it helps with the timelines -- we can use this proposal and use the work that already has been done instead of doing new work of bringing implementation details into group coordinator.

Moreover, I wonder if we need yet another thread pool to handle group coordinator logic, I think it would be good to just re-use the request handler threads to run this functionality. This would avoid thread pools proliferation and also reuse various useful improvements that work only on request pool threads, e.g. RequestLocal (hopefully we'll make it into a real thread local to be used at the point of use instead of passing the argument), various observability things, etc. Here is a PoC that does that using NonBlockingSynchronizer and KafkaRequestHandler.wrap

46acf02

The NonBlockingSynchronizer replaces EventAccumulator and MultiThreadedEventProcessor (I didn't remove them to keep the change small), it has some perf benefits e.g. in uncontended cases, the processing continues running on the request thread instead of being rescheduled on the gc thread pool. I can also easily implement read-write synchronization for the NonBlockingSynchronizer (so that readers won't block each other out), e.g. to implement non-blocking read "lock" on group when committing offsets.

It's not to say I don't like the current code, but it feels like we re-building functionality that we already have elsewhere in Kafka and we we could re-use the existing building blocks so that the gc focuses on group coordination rather than managing thread pools, getting into the details of transactional protocol, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, if we implement proper concurrency granularity for groups (serialize group updates [not whole partition], keep read "lock" on groups during commit updates) I'm not sure if we'd get much extra perf gain from piercing the appendRecords abstraction to implement pipelining. Then we could get rid of the timeline snapshot structure and hooking into replication pipeline to listen for HWM updates; we could just do appendRecords and wait for completion. Then we could completely decouple group coordinator logic from the storage stack and make it simpler.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential issue is the ordering. withActiveContextOrThrow holds a partition level lock to make sure the record is replayed in the state machine in the same order as it's appended to the log. With withActiveContextOrThrowAsync, we hold the lock to replay the record, but appends to the log without the lock. The could create a situation that the state machine may not be exactly recreated by replaying records from the log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locking model is not changed -- it holds the lock around the whole call, see line 1241

result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock());

the .whenComplete callback will execute after the function is complete, so lock is held around the whole thing.

The unlock in the finally clause is so that if we asyncFunc.apply throws an exception (which would happen if the function in fact is executed synchronously) and we didn't get the future, then we unlock inline.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Artem. Got it.

@github-actions
Copy link
Copy Markdown

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch)

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions Bot added the stale Stale PRs label Feb 15, 2024
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jan 4, 2025

This PR has been closed since it has not had any activity in 120 days. If you feel like this
was a mistake, or you would like to continue working on it, please feel free to re-open the
PR and ask for a review.

@github-actions github-actions Bot added the closed-stale PRs that were closed due to inactivity label Jan 4, 2025
@github-actions github-actions Bot closed this Jan 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

closed-stale PRs that were closed due to inactivity stale Stale PRs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants