Skip to content
Merged
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<suppress id="dontUseSystemExit"
files="Exit.java"/>
<suppress checks="ClassFanOutComplexity"
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|Utils|TransactionManager|TransactionManagerTest|KafkaAdminClient|NetworkClient|Admin|KafkaRaftClient|KafkaRaftClientTest|RaftClientTestContext).java"/>
<suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="ClassFanOutComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.protocol.SendBuilder;
Expand Down Expand Up @@ -102,8 +101,6 @@ public final Send toSend(RequestHeader header) {
return SendBuilder.buildRequestSend(header, data());
}

public abstract Message data();

// Visible for testing
public final ByteBuffer serialize() {
return MessageUtil.toByteBuffer(data(), version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@
*/
package org.apache.kafka.common.requests;

public interface AbstractRequestResponse { }
import org.apache.kafka.common.protocol.ApiMessage;

public interface AbstractRequestResponse {

ApiMessage data();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.common.protocol.SendBuilder;

Expand Down Expand Up @@ -90,8 +89,6 @@ protected void updateErrorCounts(Map<Errors, Integer> errorCounts, Errors error)
errorCounts.put(error, count + 1);
}

protected abstract Message data();

/**
* Parse a response from the provided buffer. The buffer is expected to hold both
* the {@link ResponseHeader} as well as the response payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.TopicExistsException;
Expand All @@ -75,9 +75,9 @@
import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResultCollection;
Expand Down Expand Up @@ -108,16 +108,16 @@
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition;
import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartition;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.Message;
import org.apache.kafka.common.quota.ClientQuotaAlteration;
import org.apache.kafka.common.quota.ClientQuotaEntity;
import org.apache.kafka.common.quota.ClientQuotaFilter;
Expand Down Expand Up @@ -4937,7 +4937,7 @@ public Map<Errors, Integer> errorCounts() {
}

@Override
protected Message data() {
public ApiMessage data() {
return null;
}

Expand Down
55 changes: 33 additions & 22 deletions core/src/main/scala/kafka/common/InterBrokerSendThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package kafka.common

import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}
import java.util.Map.Entry
import java.util.{ArrayDeque, ArrayList, Collection, Collections, HashMap, Iterator}

import kafka.utils.ShutdownableThread
import org.apache.kafka.clients.{ClientRequest, ClientResponse, KafkaClient, RequestCompletionHandler}
Expand All @@ -32,17 +32,19 @@ import scala.jdk.CollectionConverters._
/**
* Class for inter-broker send thread that utilize a non-blocking network client.
*/
abstract class InterBrokerSendThread(name: String,
networkClient: KafkaClient,
time: Time,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
abstract class InterBrokerSendThread(
name: String,
networkClient: KafkaClient,
requestTimeoutMs: Int,
time: Time,
isInterruptible: Boolean = true
) extends ShutdownableThread(name, isInterruptible) {

def generateRequests(): Iterable[RequestAndCompletionHandler]
def requestTimeoutMs: Int
private val unsentRequests = new UnsentRequests

def hasUnsentRequests = unsentRequests.iterator().hasNext
def generateRequests(): Iterable[RequestAndCompletionHandler]

def hasUnsentRequests: Boolean = unsentRequests.iterator().hasNext
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a size or isEmpty to UnsentRequests?


override def shutdown(): Unit = {
initiateShutdown()
Expand All @@ -51,23 +53,25 @@ abstract class InterBrokerSendThread(name: String,
awaitShutdown()
}

override def doWork(): Unit = {
var now = time.milliseconds()

private def drainGeneratedRequests(): Unit = {
generateRequests().foreach { request =>
val completionHandler = request.handler
unsentRequests.put(request.destination,
networkClient.newClientRequest(
request.destination.idString,
request.request,
now,
request.creationTimeMs,
true,
requestTimeoutMs,
completionHandler))
request.handler
))
}
}

protected def pollOnce(maxTimeoutMs: Long): Unit = {
try {
val timeout = sendRequests(now)
drainGeneratedRequests()
var now = time.milliseconds()
Comment thread
hachikuji marked this conversation as resolved.
Outdated
val timeout = sendRequests(now, maxTimeoutMs)
networkClient.poll(timeout, now)
now = time.milliseconds()
checkDisconnects(now)
Expand All @@ -85,8 +89,12 @@ abstract class InterBrokerSendThread(name: String,
}
}

private def sendRequests(now: Long): Long = {
var pollTimeout = Long.MaxValue
override def doWork(): Unit = {
pollOnce(Long.MaxValue)
}

private def sendRequests(now: Long, maxTimeoutMs: Long): Long = {
var pollTimeout = maxTimeoutMs
for (node <- unsentRequests.nodes.asScala) {
val requestIterator = unsentRequests.requestIterator(node)
while (requestIterator.hasNext) {
Expand Down Expand Up @@ -143,9 +151,12 @@ abstract class InterBrokerSendThread(name: String,
def wakeup(): Unit = networkClient.wakeup()
}

case class RequestAndCompletionHandler(destination: Node,
request: AbstractRequest.Builder[_ <: AbstractRequest],
handler: RequestCompletionHandler)
case class RequestAndCompletionHandler(
creationTimeMs: Long,
destination: Node,
request: AbstractRequest.Builder[_ <: AbstractRequest],
handler: RequestCompletionHandler
)

private class UnsentRequests {
private val unsent = new HashMap[Node, ArrayDeque[ClientRequest]]
Expand Down Expand Up @@ -198,5 +209,5 @@ private class UnsentRequests {
requests.iterator
}

def nodes = unsent.keySet
def nodes: java.util.Set[Node] = unsent.keySet
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import kafka.api.KAFKA_2_8_IV0
import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
import kafka.metrics.KafkaMetricsGroup
import kafka.server.{KafkaConfig, MetadataCache}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.clients._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network._
Expand All @@ -36,8 +36,8 @@ import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{Node, Reconfigurable, TopicPartition}

import scala.jdk.CollectionConverters._
import scala.collection.{concurrent, immutable}
import scala.jdk.CollectionConverters._

object TransactionMarkerChannelManager {
def apply(config: KafkaConfig,
Expand Down Expand Up @@ -127,11 +127,14 @@ class TxnMarkerQueue(@volatile var destination: Node) {
def totalNumMarkers(txnTopicPartition: Int): Int = markersPerTxnTopicPartition.get(txnTopicPartition).fold(0)(_.size)
}

class TransactionMarkerChannelManager(config: KafkaConfig,
metadataCache: MetadataCache,
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
time: Time) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, time) with Logging with KafkaMetricsGroup {
class TransactionMarkerChannelManager(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (related to style question elsewhere) if we want to change the style of these class definitions, can we do it as a separate PR? I always find it difficult when style changes are conflated with logical changes

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair. I actually held myself back. I tried to only touch the cases that I was modifying anyway, but let me know if there are others. This one was especially obnoxious because of the long parameter list to InterBrokerSendThread.

config: KafkaConfig,
metadataCache: MetadataCache,
networkClient: NetworkClient,
txnStateManager: TransactionStateManager,
time: Time
) extends InterBrokerSendThread("TxnMarkerSenderThread-" + config.brokerId, networkClient, config.requestTimeoutMs, time)
with Logging with KafkaMetricsGroup {

this.logIdent = "[Transaction Marker Channel Manager " + config.brokerId + "]: "

Expand All @@ -145,17 +148,13 @@ class TransactionMarkerChannelManager(config: KafkaConfig,

private val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn]

override val requestTimeoutMs: Int = config.requestTimeoutMs

val writeTxnMarkersRequestVersion: Short =
if (config.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 1
else 0

newGauge("UnknownDestinationQueueSize", () => markersQueueForUnknownBroker.totalNumMarkers)
newGauge("LogAppendRetryQueueSize", () => txnLogAppendRetryQueue.size)

override def generateRequests() = drainQueuedTransactionMarkers()

override def shutdown(): Unit = {
super.shutdown()
markersQueuePerBroker.clear()
Expand Down Expand Up @@ -191,7 +190,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}
}

private[transaction] def drainQueuedTransactionMarkers(): Iterable[RequestAndCompletionHandler] = {
override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
retryLogAppends()
val txnIdAndMarkerEntries: java.util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
markersQueueForUnknownBroker.forEachTxnTopicPartition { case (_, queue) =>
Expand All @@ -209,6 +208,7 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, topicPartitions)
}

val currentTimeMs = time.milliseconds()
markersQueuePerBroker.values.map { brokerRequestQueue =>
val txnIdAndMarkerEntries = new util.ArrayList[TxnIdAndMarkerEntry]()
brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
Expand All @@ -218,7 +218,14 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
RequestAndCompletionHandler(node, new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend), requestCompletionHandler)
val request = new WriteTxnMarkersRequest.Builder(writeTxnMarkersRequestVersion, markersToSend)

RequestAndCompletionHandler(
currentTimeMs,
node,
request,
requestCompletionHandler
)
}
}

Expand Down
Loading