Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
implementation project(':storage')
implementation libs.slf4jApi
implementation libs.metrics

Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
<allow pkg="org.apache.kafka.server.common"/>
<allow pkg="org.apache.kafka.server.record"/>
<allow pkg="org.apache.kafka.server.util"/>
<allow pkg="org.apache.kafka.storage.internals.log"/>
<allow pkg="org.apache.kafka.test" />
<allow pkg="org.apache.kafka.timeline" />
<subpackage name="metrics">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package kafka.coordinator.group

import kafka.cluster.PartitionListener
import kafka.server.{ActionQueue, ReplicaManager}
import kafka.server.{ActionQueue, ReplicaManager, RequestLocal}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol.Errors
Expand All @@ -27,9 +27,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.AppendOrigin
import org.apache.kafka.storage.internals.log.VerificationGuard

import java.util
import java.util.concurrent.CompletableFuture
import scala.collection.Map

/**
Expand Down Expand Up @@ -81,10 +82,7 @@ class CoordinatorPartitionWriter[T](
}

/**
* Register a PartitionWriter.Listener.
*
* @param tp The partition to register the listener to.
* @param listener The listener.
* {@inheritDoc}
*/
override def registerListener(
tp: TopicPartition,
Expand All @@ -94,10 +92,7 @@ class CoordinatorPartitionWriter[T](
}

/**
* Deregister a PartitionWriter.Listener.
*
* @param tp The partition to deregister the listener from.
* @param listener The listener.
* {@inheritDoc}
*/
override def deregisterListener(
tp: TopicPartition,
Expand All @@ -107,20 +102,13 @@ class CoordinatorPartitionWriter[T](
}

/**
* Write records to the partitions. Records are written in one batch so
* atomicity is guaranteed.
*
* @param tp The partition to write records to.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param records The list of records. The records are written in a single batch.
* @return The log end offset right after the written records.
* @throws KafkaException Any KafkaException caught during the write operation.
* {@inheritDoc}
*/
override def append(
tp: TopicPartition,
producerId: Long,
producerEpoch: Short,
verificationGuard: VerificationGuard,
records: util.List[T]
): Long = {
if (records.isEmpty) throw new IllegalStateException("records must be non-empty.")
Expand Down Expand Up @@ -161,7 +149,7 @@ class CoordinatorPartitionWriter[T](
s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.")
}

internalAppend(tp, recordsBuilder.build())
internalAppend(tp, recordsBuilder.build(), verificationGuard)
} finally {
bufferSupplier.release(buffer)
}
Expand All @@ -172,15 +160,7 @@ class CoordinatorPartitionWriter[T](
}

/**
* Write the transaction end marker.
*
* @param tp The partition to write records to.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
* @param coordinatorEpoch The epoch of the transaction coordinator.
* @param result The transaction result.
* @return The log end offset right after the written records.
* @throws KafkaException Any KafkaException caught during the write operation.
* {@inheritDoc}
*/
override def appendEndTransactionMarker(
tp: TopicPartition,
Expand All @@ -201,18 +181,48 @@ class CoordinatorPartitionWriter[T](
))
}

/**
* {@inheritDoc}
*/
override def maybeStartTransactionVerification(
tp: TopicPartition,
transactionalId: String,
producerId: Long,
producerEpoch: Short
): CompletableFuture[VerificationGuard] = {
val future = new CompletableFuture[VerificationGuard]()
replicaManager.maybeStartTransactionVerificationForPartition(
Comment thread
dajac marked this conversation as resolved.
topicPartition = tp,
transactionalId = transactionalId,
producerId = producerId,
producerEpoch = producerEpoch,
baseSequence = RecordBatch.NO_SEQUENCE,
requestLocal = RequestLocal.NoCaching,
callback = (error, _, verificationGuard) => {
if (error != Errors.NONE) {
future.completeExceptionally(error.exception)
} else {
future.complete(verificationGuard)
}
}
)
future
}

private def internalAppend(
tp: TopicPartition,
memoryRecords: MemoryRecords
memoryRecords: MemoryRecords,
verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
): Long = {
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
replicaManager.appendRecords(
replicaManager.appendForGroup(
timeout = 0L,
requiredAcks = 1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> memoryRecords),
responseCallback = results => appendResults = results,
requestLocal = RequestLocal.NoCaching,
Comment thread
dajac marked this conversation as resolved.
verificationGuards = Map(tp -> verificationGuard),
delayedProduceLock = None,
// We can directly complete the purgatories here because we don't hold
// any conflicting locks.
actionQueue = directActionQueue
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
* @param actionQueue the action queue to use
Comment thread
dajac marked this conversation as resolved.
*/
def appendForGroup(
timeout: Long,
Expand All @@ -997,7 +998,8 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock],
requestLocal: RequestLocal,
verificationGuards: Map[TopicPartition, VerificationGuard]
verificationGuards: Map[TopicPartition, VerificationGuard],
actionQueue: ActionQueue = this.defaultActionQueue
): Unit = {
if (!isValidRequiredAcks(requiredAcks)) {
sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
Expand Down Expand Up @@ -1025,7 +1027,7 @@ class ReplicaManager(val config: KafkaConfig,
val allResults = localProduceResults
val produceStatus = buildProducePartitionStatus(allResults)

addCompletePurgatoryAction(defaultActionQueue, allResults)
addCompletePurgatoryAction(actionQueue, allResults)

maybeAddDelayedProduce(
requiredAcks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ object AbstractCoordinatorConcurrencyTest {
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
requestLocal: RequestLocal = RequestLocal.NoCaching,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty,
actionQueue: ActionQueue = null): Unit = {
appendRecords(timeout, requiredAcks, true, AppendOrigin.COORDINATOR, entriesPerPartition, responseCallback,
delayedProduceLock, requestLocal = requestLocal)
}
Expand Down
Loading