Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 22 additions & 20 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,46 +296,48 @@ class Partition(val topic: String,
}

/*
* Note that this method will only be called if requiredAcks = -1
* and we are waiting for all replicas in ISR to be fully caught up to
* the (local) leader's offset corresponding to this produce request
* before we acknowledge the produce request.
* Returns a tuple where the first element is a boolean indicating whether enough replicas reached `requiredOffset`
* and the second element is an error (which would be `Errors.NONE` for no error).
*
* Note that this method will only be called if requiredAcks = -1 and we are waiting for all replicas in ISR to be
* fully caught up to the (local) leader's offset corresponding to this produce request before we acknowledge the
* produce request.
*/
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Short) = {
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal() match {
case Some(leaderReplica) =>
// keep the current immutable replica list reference
val curInSyncReplicas = inSyncReplicas
val numAcks = curInSyncReplicas.count(r => {

def numAcks = curInSyncReplicas.count { r =>
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @junrao Is it correct that we don't use numAcks in this method outside of logging?

if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
trace("Replica %d of %s-%d received offset %d".format(r.brokerId, topic, partitionId, requiredOffset))
trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} received offset $requiredOffset")
true
}
else
false
else
true /* also count the local (leader) replica */
})
}

trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, topic, partitionId))
trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks = -1")

val minIsr = leaderReplica.log.get.config.minInSyncReplicas

if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
/*
* The topic may be configured not to accept messages if there are not enough replicas in ISR
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curInSyncReplicas.size) {
(true, Errors.NONE.code)
} else {
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND.code)
}
* The topic may be configured not to accept messages if there are not enough replicas in ISR
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curInSyncReplicas.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE.code)
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_FOR_PARTITION.code)
(false, Errors.NOT_LEADER_FOR_PARTITION)
}
}

Expand Down
22 changes: 8 additions & 14 deletions core/src/main/scala/kafka/server/DelayedProduce.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,32 +82,26 @@ class DelayedProduce(delayMs: Long,
override def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.foreach { case (topicAndPartition, status) =>
trace("Checking produce satisfaction for %s, current status %s"
.format(topicAndPartition, status))
trace(s"Checking produce satisfaction for ${topicAndPartition}, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
val (hasEnough, errorCode) = partitionOpt match {
val (hasEnough, error) = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition) match {
case Some(partition) =>
partition.checkEnoughReplicasReachOffset(status.requiredOffset)
case None =>
// Case A
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
if (errorCode != Errors.NONE.code) {
// Case B.1
// Case B.1 || B.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.errorCode = errorCode
} else if (hasEnough) {
// Case B.2
status.acksPending = false
status.responseStatus.errorCode = Errors.NONE.code
status.responseStatus.errorCode = error.code
}
}
}

// check if each partition has satisfied at lease one of case A and case B
if (! produceMetadata.produceStatus.values.exists(p => p.acksPending))
// check if every partition has satisfied at least one of case A or B
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
Expand Down