Skip to content

KAFKA-15987: Refactor ReplicaManager code for transaction verification#15087

Merged
jolshan merged 21 commits intoapache:trunkfrom
jolshan:kafka-15987
Jan 26, 2024
Merged

KAFKA-15987: Refactor ReplicaManager code for transaction verification#15087
jolshan merged 21 commits intoapache:trunkfrom
jolshan:kafka-15987

Conversation

@jolshan
Copy link
Copy Markdown
Member

@jolshan jolshan commented Dec 28, 2023

I originally did some refactors in #14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

  • Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
  • appendRecords has been simplified
  • Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
  • Fixed incorrect capital param name in KafkaRequestHandler
  • Updated ReplicaManager test to handle transaction verification appends separately. (I may revise this)

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala Outdated
@jolshan jolshan marked this pull request as ready for review January 8, 2024 23:59
dajac added a commit that referenced this pull request Jan 11, 2024
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when #15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jolshan Thanks for the PR. Overall, the patch looks pretty good. I left a few initial comments. I need to re-read the appendRecords again. I will do it tomorrow.

Comment thread core/src/main/scala/kafka/server/KafkaApis.scala Outdated
Comment thread core/src/main/scala/kafka/server/ReplicaManager.scala
Comment on lines -1191 to -1201
// Map transaction coordinator errors to known errors for the response
val convertedErrors = verificationErrors.map { case (tp, error) =>
error match {
case Errors.CONCURRENT_TRANSACTIONS |
Errors.COORDINATOR_LOAD_IN_PROGRESS |
Errors.COORDINATOR_NOT_AVAILABLE |
Errors.NOT_COORDINATOR => tp -> Errors.NOT_ENOUGH_REPLICAS
case _ => tp -> error
}

}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, we remove this here and we adds it back in handleProduceAppend and we rely on the conversion in the group coordinator. Did I get it right? In the group coordinator, we don't handle CONCURRENT_TRANSACTIONS, I think. I need to double check.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have separate handling for produce requests and txn offset commit requests.

for produce:

              case Errors.INVALID_TXN_STATE => Some(error.exception("Partition was not added to the transaction"))
              case Errors.CONCURRENT_TRANSACTIONS |
                   Errors.COORDINATOR_LOAD_IN_PROGRESS |
                   Errors.COORDINATOR_NOT_AVAILABLE |
                   Errors.NOT_COORDINATOR => Some(new NotEnoughReplicasException(
                s"Unable to verify the partition has been added to the transaction. Underlying error: ${error.toString}"))
              case _ => None

for txn offset commit:

    error match {
      case Errors.UNKNOWN_TOPIC_OR_PARTITION
           | Errors.NOT_ENOUGH_REPLICAS
           | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
        Errors.COORDINATOR_NOT_AVAILABLE

      case Errors.NOT_LEADER_OR_FOLLOWER
           | Errors.KAFKA_STORAGE_ERROR =>
        Errors.NOT_COORDINATOR

      case Errors.MESSAGE_TOO_LARGE
           | Errors.RECORD_LIST_TOO_LARGE
           | Errors.INVALID_FETCH_SIZE =>
        Errors.INVALID_COMMIT_OFFSET_SIZE

      // We may see INVALID_TXN_STATE or INVALID_PID_MAPPING here due to transaction verification.
      // They can be returned without mapping to a new error.
      case other => other
    }

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But yes, we simply pass through concurrent txns which will be fatal to the client.


@ParameterizedTest
@EnumSource(value = classOf[Errors], names = Array("NOT_COORDINATOR", "CONCURRENT_TRANSACTIONS", "COORDINATOR_LOAD_IN_PROGRESS", "COORDINATOR_NOT_AVAILABLE"))
def testMaybeVerificationErrorConversions(error: Errors): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to keep this one as we still have those conversion but in a different place now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have the test above in this file.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have one for the GroupCoordinator.

Comment thread core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala Outdated
showuon pushed a commit to showuon/kafka that referenced this pull request Jan 22, 2024
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when apache#15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jolshan!

@jolshan jolshan merged commit 5eb8201 into apache:trunk Jan 26, 2024
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when apache#15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when apache#15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
This patch wires the transaction verification in the new group coordinator. It basically calls the verification path before scheduling the write operation. If the verification fails, the error is returned to the caller.

Note that the patch uses `appendForGroup`. I suppose that we will move away from using it when apache#15087 is merged.

Reviewers: Justine Olshan <jolshan@confluent.io>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
apache#15087)

I originally did some refactors in apache#14774, but we decided to keep the changes minimal since the ticket was a blocker. Here are those refactors:

* Removed separate append paths so that produce, group coordinator, and other append paths all call appendRecords
* AppendRecords has been simplified
* Removed unneeded error conversions in verification code since group coordinator and produce path convert errors differently, removed test for that
* Fixed incorrect capital param name in KafkaRequestHandler
* Updated ReplicaManager test to handle produce appends separately when transactions are used.

Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
hasCustomErrorMessage = customException.isDefined
)
}
val entriesWithoutErrorsPerPartition = entriesPerPartition.filter { case (key, _) => !errorResults.contains(key) }
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for raising a question on this merged PR. I just have one small concern: in this non-transaction path, errorResults is always empty, so there is no need to recreate a map collection, since entriesWithoutErrorsPerPartition is identical to entriesPerPartition. Did I misunderstand anything? If not, I can file a minor PR to improve it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we could probably only do this filtering if the errorResults is nonEmpty.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I open the PR to addressed this improvement #20410

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants