Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/import-control-jmh-benchmarks.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
<allow pkg="kafka.log"/>
<allow pkg="kafka.server"/>
<allow pkg="kafka.api"/>
<allow class="kafka.utils.Pool"/>
<allow class="kafka.utils.KafkaScheduler"/>
<allow class="org.apache.kafka.clients.FetchSessionHandler"/>
<allow pkg="org.mockito"/>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,18 @@ public class Builder {
* Another reason is because we make use of the list ordering to optimize the preparation of
* incremental fetch requests (see below).
*/
private LinkedHashMap<TopicPartition, PartitionData> next = new LinkedHashMap<>();
private LinkedHashMap<TopicPartition, PartitionData> next;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we make one step further, to just make the sessionPartitions a local variable only: this is because in later handling logic after the response is sent back, we only need sessionPartitions.keySet / size only, which we could maintain separately as a member field which is populated after the build() call. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NVM I realized now you also need the sessionPartitions for saving one fetch state copying as well.

private final boolean copySessionPartitions;

Builder() {
this.next = new LinkedHashMap<>();
this.copySessionPartitions = true;
}

Builder(int initialSize, boolean copySessionPartitions) {
this.next = new LinkedHashMap<>(initialSize);
this.copySessionPartitions = copySessionPartitions;
}

/**
* Mark that we want data from this partition in the upcoming fetch.
Expand Down Expand Up @@ -215,15 +226,10 @@ public FetchRequestData build() {
Entry<TopicPartition, PartitionData> entry = iter.next();
TopicPartition topicPartition = entry.getKey();
PartitionData prevData = entry.getValue();
PartitionData nextData = next.get(topicPartition);
PartitionData nextData = next.remove(topicPartition);
if (nextData != null) {
if (prevData.equals(nextData)) {
// Omit this partition from the FetchRequest, because it hasn't changed
// since the previous request.
next.remove(topicPartition);
} else {
// Move the altered partition to the end of 'next'
next.remove(topicPartition);
if (!prevData.equals(nextData)) {
// Re-add the altered partition to the end of 'next'
next.put(topicPartition, nextData);
entry.setValue(nextData);
altered.add(topicPartition);
Expand Down Expand Up @@ -255,10 +261,10 @@ public FetchRequestData build() {
partitionsToLogString(altered), partitionsToLogString(removed),
partitionsToLogString(sessionPartitions.keySet()));
}
Map<TopicPartition, PartitionData> toSend =
Collections.unmodifiableMap(new LinkedHashMap<>(next));
Map<TopicPartition, PartitionData> curSessionPartitions =
Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions));
Map<TopicPartition, PartitionData> toSend = Collections.unmodifiableMap(next);
Map<TopicPartition, PartitionData> curSessionPartitions = copySessionPartitions
? Collections.unmodifiableMap(new LinkedHashMap<>(sessionPartitions))
: Collections.unmodifiableMap(sessionPartitions);
next = null;
return new FetchRequestData(toSend, Collections.unmodifiableList(removed),
curSessionPartitions, nextMetadata);
Expand All @@ -269,6 +275,18 @@ public Builder newBuilder() {
return new Builder();
}


/** A builder that allows for presizing the PartitionData hashmap, and avoiding making a
* secondary copy of the sessionPartitions, in cases where this is not necessarily.
* This builder is primarily for use by the Replica Fetcher
* @param size the initial size of the PartitionData hashmap
* @param copySessionPartitions boolean denoting whether the builder should make a deep copy of
* session partitions
*/
public Builder newBuilder(int size, boolean copySessionPartitions) {
return new Builder(size, copySessionPartitions);
}

private String partitionsToLogString(Collection<TopicPartition> partitions) {
if (!log.isTraceEnabled()) {
return String.format("%d partition(s)", partitions.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public Stream<PartitionState<S>> stream() {
}

public LinkedHashMap<TopicPartition, S> partitionStateMap() {
return new LinkedHashMap<>(map);
return map;
}

/**
Expand Down
117 changes: 59 additions & 58 deletions core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Large diffs are not rendered by default.

19 changes: 12 additions & 7 deletions core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.Optional
import kafka.api.Request
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UnboundedQuota
import org.apache.kafka.common.TopicPartition
Expand All @@ -34,7 +35,7 @@ import org.apache.kafka.common.requests.FetchResponse.PartitionData
import org.apache.kafka.common.requests.{EpochEndOffset, FetchRequest, FetchResponse}

import scala.collection.JavaConverters._
import scala.collection.{Map, Seq, Set, mutable}
import scala.collection.{mutable, Map, Seq, Set}
Comment thread
lbradstreet marked this conversation as resolved.

class ReplicaAlterLogDirsThread(name: String,
sourceBroker: BrokerEndPoint,
Expand All @@ -59,6 +60,10 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).latestEpoch
}

override protected def logStartOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logStartOffset
}

override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.futureLocalLogOrException(topicPartition).logEndOffset
}
Expand All @@ -67,7 +72,7 @@ class ReplicaAlterLogDirsThread(name: String,
replicaMgr.futureLocalLogOrException(topicPartition).endOffsetForEpoch(epoch)
}

def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData[Records])] = null
val request = fetchRequest.build()

Expand Down Expand Up @@ -95,7 +100,7 @@ class ReplicaAlterLogDirsThread(name: String,
if (partitionData == null)
throw new IllegalStateException(s"Failed to fetch data for partitions ${request.fetchData.keySet().toArray.mkString(",")}")

partitionData
partitionData.toMap
}

// process fetched data
Expand Down Expand Up @@ -218,7 +223,7 @@ class ReplicaAlterLogDirsThread(name: String,
nextPartitionOpt
}

private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[FetchRequest.Builder]] = {
private def buildFetchForPartition(tp: TopicPartition, fetchState: PartitionFetchState): ResultWithPartitions[Option[ReplicaFetch]] = {
val requestMap = new util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData]
val partitionsWithError = mutable.Set[TopicPartition]()

Expand All @@ -237,14 +242,14 @@ class ReplicaAlterLogDirsThread(name: String,
} else {
// Set maxWait and minBytes to 0 because the response should return immediately if
// the future log has caught up with the current log of the partition
Some(FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap)
.setMaxBytes(maxBytes))
val requestBuilder = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 0, requestMap).setMaxBytes(maxBytes)
Some(ReplicaFetch(requestMap, requestBuilder))
}

ResultWithPartitions(fetchRequestOpt, partitionsWithError)
}

def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
// Only include replica in the fetch request if it is not throttled.
if (quota.isQuotaExceeded) {
ResultWithPartitions(None, Set.empty)
Expand Down
30 changes: 17 additions & 13 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.Optional
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.server.AbstractFetcherThread.ReplicaFetch
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.TopicPartition
Expand All @@ -34,7 +35,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.utils.{LogContext, Time}

import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}
import scala.collection.{mutable, Map}

class ReplicaFetcherThread(name: String,
fetcherId: Int,
Expand Down Expand Up @@ -96,12 +97,16 @@ class ReplicaFetcherThread(name: String,
private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
private val fetchSize = brokerConfig.replicaFetchMaxBytes
private val brokerSupportsLeaderEpochRequest = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)
val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)

override protected def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
replicaMgr.localLogOrException(topicPartition).latestEpoch
}

override protected def logStartOffset(topicPartition: TopicPartition): Long = {
replicaMgr.localLogOrException(topicPartition).logStartOffset
}

override protected def logEndOffset(topicPartition: TopicPartition): Long = {
replicaMgr.localLogOrException(topicPartition).logEndOffset
}
Expand Down Expand Up @@ -191,14 +196,14 @@ class ReplicaFetcherThread(name: String,
}


override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override protected def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
try {
val clientResponse = leaderEndpoint.sendRequest(fetchRequest)
val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
if (!fetchSessionHandler.handleResponse(fetchResponse)) {
Nil
Map.empty
} else {
fetchResponse.responseData.asScala.toSeq
fetchResponse.responseData.asScala
}
} catch {
case t: Throwable =>
Expand Down Expand Up @@ -236,15 +241,15 @@ class ReplicaFetcherThread(name: String,
}
}

override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val partitionsWithError = mutable.Set[TopicPartition]()

val builder = fetchSessionHandler.newBuilder()
val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the new builder which presizes the PartitionData hashmap, and which does not make another copy of sessionPartitions. We will not be modifying sessionPartitions, or using it again after we have processed the fetch request, so we do not need to make an unnecessary copy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, here, we choose not to copy sessionPartitions in the builder. However, in line 265 and line 273, we still read sessionPartitions. Is that correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is still correct. We do not copy sessionPartitions, but sessionPartitions will also not be modified until the next fetch/doWork iteration, and all use of it is from the same thread. I believe it is thus safe to use it without making a copy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. That makes sense.

partitionMap.foreach { case (topicPartition, fetchState) =>
// We will not include a replica in the fetch request if it should be throttled.
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
try {
val logStartOffset = replicaMgr.localLogOrException(topicPartition).logStartOffset
val logStartOffset = this.logStartOffset(topicPartition)
builder.add(topicPartition, new FetchRequest.PartitionData(
fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
} catch {
Expand All @@ -265,7 +270,7 @@ class ReplicaFetcherThread(name: String,
.setMaxBytes(maxBytes)
.toForget(fetchData.toForget)
.metadata(fetchData.metadata)
Some(requestBuilder)
Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By passing through sessionPartitions we can avoid making another copy of the PartitionMap in https://github.com/apache/kafka/pull/7443/files#diff-2d03a5d4349a23e56c09b4e773055249L119. sessionPartitions contains all of the PartitionData that we need to process the fetch response.

}

ResultWithPartitions(fetchRequestOpt, partitionsWithError)
Expand Down Expand Up @@ -330,9 +335,8 @@ class ReplicaFetcherThread(name: String,
* To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
* the quota is exceeded and the replica is not in sync.
*/
private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition)
!isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
!fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid a HashMap lookup here by including the lag in the PartitionFetchState directly.

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class AbstractFetcherManagerTest {
EasyMock.expect(fetcher.start())
EasyMock.expect(fetcher.addPartitions(Map(tp -> OffsetAndEpoch(fetchOffset, leaderEpoch))))
EasyMock.expect(fetcher.fetchState(tp))
.andReturn(Some(PartitionFetchState(fetchOffset, leaderEpoch, Truncating)))
.andReturn(Some(PartitionFetchState(fetchOffset, None, leaderEpoch, Truncating)))
EasyMock.expect(fetcher.removePartitions(Set(tp)))
EasyMock.expect(fetcher.fetchState(tp)).andReturn(None)
EasyMock.replay(fetcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.yammer.metrics.Metrics
import kafka.cluster.BrokerEndPoint
import kafka.log.LogAppendInfo
import kafka.message.NoCompressionCodec
import kafka.server.AbstractFetcherThread.ReplicaFetch
Comment thread
lbradstreet marked this conversation as resolved.
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.utils.TestUtils
import org.apache.kafka.common.KafkaException
Expand All @@ -38,7 +39,7 @@ import org.junit.Assert._
import org.junit.{Before, Test}

import scala.collection.JavaConverters._
import scala.collection.{Map, Set, mutable}
import scala.collection.{mutable, Map, Set}
import scala.util.Random
import org.scalatest.Assertions.assertThrows

Expand Down Expand Up @@ -575,7 +576,7 @@ class AbstractFetcherThreadTest {

val fetcher = new MockFetcherThread {
var fetchedOnce = false
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
val fetchedData = super.fetchFromLeader(fetchRequest)
if (!fetchedOnce) {
val records = fetchedData.head._2.records.asInstanceOf[MemoryRecords]
Expand Down Expand Up @@ -901,7 +902,7 @@ class AbstractFetcherThreadTest {
state.highWatermark = offset
}

override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[FetchRequest.Builder]] = {
override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData]
partitionMap.foreach { case (partition, state) =>
if (state.isReadyForFetch) {
Expand All @@ -911,14 +912,16 @@ class AbstractFetcherThreadTest {
}
}
val fetchRequest = FetchRequest.Builder.forReplica(ApiKeys.FETCH.latestVersion, replicaId, 0, 1, fetchData.asJava)
ResultWithPartitions(Some(fetchRequest), Set.empty)
ResultWithPartitions(Some(ReplicaFetch(fetchData.asJava, fetchRequest)), Set.empty)
}

override def latestEpoch(topicPartition: TopicPartition): Option[Int] = {
val state = replicaPartitionState(topicPartition)
state.log.lastOption.map(_.partitionLeaderEpoch).orElse(Some(EpochEndOffset.UNDEFINED_EPOCH))
}

override def logStartOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logStartOffset

override def logEndOffset(topicPartition: TopicPartition): Long = replicaPartitionState(topicPartition).logEndOffset

override def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = {
Expand Down Expand Up @@ -973,7 +976,7 @@ class AbstractFetcherThreadTest {

override protected def isOffsetForLeaderEpochSupported: Boolean = true

override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Seq[(TopicPartition, FetchData)] = {
override def fetchFromLeader(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = {
fetchRequest.fetchData.asScala.map { case (partition, fetchData) =>
val leaderState = leaderPartitionState(partition)
val epochCheckError = checkExpectedLeaderEpoch(fetchData.currentLeaderEpoch, leaderState)
Expand All @@ -1000,7 +1003,7 @@ class AbstractFetcherThreadTest {

(partition, new FetchData(error, leaderState.highWatermark, leaderState.highWatermark, leaderState.logStartOffset,
List.empty.asJava, records))
}.toSeq
}.toMap
}

private def checkLeaderEpochAndThrow(expectedEpoch: Int, partitionState: PartitionState): Unit = {
Expand Down
Loading