Skip to content

KAFKA-14505; [6/N] Avoid recheduling callback in request thread#15176

Merged
dajac merged 2 commits intoapache:trunkfrom
dajac:KAFKA-14505-6
Jan 31, 2024
Merged

KAFKA-14505; [6/N] Avoid recheduling callback in request thread#15176
dajac merged 2 commits intoapache:trunkfrom
dajac:KAFKA-14505-6

Conversation

@dajac
Copy link
Copy Markdown
Member

@dajac dajac commented Jan 11, 2024

This patch removes the extra hop via the request thread when the new group coordinator verifies a transaction. Prior to it, the ReplicaManager would automatically re-schedule the callback to a request thread. However, the new group coordinator does not need this as it already schedules the write into its own thread. With this patch, the decision to re-schedule on a request thread or not is left to the caller.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

baseSequence: Int,
requestLocal: RequestLocal,
callback: (Errors, RequestLocal, VerificationGuard) => Unit
callback: Either[Errors, VerificationGuard] => Unit
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to change the callback because KafkaRequestHandler.wrapAsyncCallback only support wrapping unary functions.

@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Jan 11, 2024
@dajac dajac requested a review from jolshan January 11, 2024 15:33
@dajac
Copy link
Copy Markdown
Member Author

dajac commented Jan 11, 2024

@artemlivshits @jolshan Could you please take a look at this one when you get a chance? Let me know what you think.

preAppendErrors.getOrElse(topicPartition, Errors.NONE),
newRequestLocal,
verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
def generalizedCallback(results: Map[TopicPartition, Either[Errors, VerificationGuard]]): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do the translation from preAppendErrors, newRequestLocal, verificationGuards here, then we'd avoid propagating the changes all the way to replication layer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was debating about this. The reasoning for not doing it here is that @jolshan may also use maybeStartTransactionVerificationForPartitions from the core path in conjunction with wrapAsyncCallback so she will also need to have a unary callback. @jolshan Is my understanding correct? Otherwise, we could limit the change to this method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please take a look at my refactor PR. I have some this to some extent.
I'd prefer not to overhaul it again (as I did after the previous group coordinator change)
Hopefully it makes this work easier too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. I will take a look at #15087.

newRequestLocal,
verificationGuards.getOrElse(topicPartition, VerificationGuard.SENTINEL))
def generalizedCallback(results: Map[TopicPartition, Either[Errors, VerificationGuard]]): Unit = {
callback(results.getOrElse(topicPartition, Right(VerificationGuard.SENTINEL)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is just a translation from the current implementation (so it's not introducing anything new), but is it expected that we don't get the results for the requested topicPartition? Should we log a warning, so that we know that we're hitting some unexpected code path?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not happen but who knows. Let me log an error and fail the callback if it would ever happen.

producerEpoch: Short,
requestLocal: RequestLocal,
callback: (Map[TopicPartition, Errors], RequestLocal, Map[TopicPartition, VerificationGuard]) => Unit
callback: mutable.Map[TopicPartition, Either[Errors, VerificationGuard]] => Unit
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding, that once we refactor these changes, this function could be either called from GC code path (that may not care about requestLocal) or from the core data path, that needs requestLocal, because the callback may be called immediately in this thread context.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. The requestLocal is not required within this function though. The caller if it uses wrapAsyncCallback will get the correct one to use.

postVerificationCallback
// Wrap the callback to be handled on an arbitrary request handler thread
// when transaction verification is complete.
KafkaRequestHandler.wrapAsyncCallback(postVerificationCallback, requestLocal)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's interesting to note (I don't think we need to change anything) is that now we'll have a production code path (and not just unit test) where we can call wrapped callback on the same request thread and we'll go through the optimized code path where we call the callback directly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we calling this here? I thought we wanted to avoid this wrap here and only do it for produce requests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need this in the old coordinator.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry. I guess I was just confused I didn't see it in the replica manager flow. (for produce)

@dajac
Copy link
Copy Markdown
Member Author

dajac commented Jan 29, 2024

@jolshan I reworked the PR based on #15087. It is quite different from the previous one. Please take a look when you get a chance.

Comment thread core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
@dajac
Copy link
Copy Markdown
Member Author

dajac commented Jan 30, 2024

Thanks @jolshan. I have addressed your comments.

@dajac dajac requested a review from jolshan January 30, 2024 08:28
Copy link
Copy Markdown
Member

@jolshan jolshan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

@dajac dajac merged commit 6dd517d into apache:trunk Jan 31, 2024
@dajac dajac deleted the KAFKA-14505-6 branch January 31, 2024 07:27
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…he#15176)

This patch removes the extra hop via the request thread when the new group coordinator verifies a transaction. Prior to it, the ReplicaManager would automatically re-schedule the callback to a request thread. However, the new group coordinator does not need this as it already schedules the write into its own thread. With this patch, the decision to re-schedule on a request thread or not is left to the caller.

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…he#15176)

This patch removes the extra hop via the request thread when the new group coordinator verifies a transaction. Prior to it, the ReplicaManager would automatically re-schedule the callback to a request thread. However, the new group coordinator does not need this as it already schedules the write into its own thread. With this patch, the decision to re-schedule on a request thread or not is left to the caller.

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
…he#15176)

This patch removes the extra hop via the request thread when the new group coordinator verifies a transaction. Prior to it, the ReplicaManager would automatically re-schedule the callback to a request thread. However, the new group coordinator does not need this as it already schedules the write into its own thread. With this patch, the decision to re-schedule on a request thread or not is left to the caller.

Reviewers: Artem Livshits <alivshits@confluent.io>, Justine Olshan <jolshan@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

KIP-848 The Next Generation of the Consumer Rebalance Protocol

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants