Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,23 +109,43 @@ object TransactionMarkerChannelManager {

}

class TxnMarkerQueue(@volatile var destination: Node) {
class TxnMarkerQueue(@volatile var destination: Node) extends Logging {

// keep track of the requests per txn topic partition so we can easily clear the queue
// during partition emigration
private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[TxnIdAndMarkerEntry]]().asScala
private val markersPerTxnTopicPartition = new ConcurrentHashMap[Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]]().asScala
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we keep track of the PendingCompleteTxn that was added in the transactionsWithPendingMarkers.


def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[TxnIdAndMarkerEntry]] = {
def removeMarkersForTxnTopicPartition(partition: Int): Option[BlockingQueue[PendingCompleteTxnAndMarkerEntry]] = {
markersPerTxnTopicPartition.remove(partition)
}

def addMarkers(txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = {
val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition,
new LinkedBlockingQueue[TxnIdAndMarkerEntry]())
queue.add(txnIdAndMarker)
def addMarkers(txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = {
val queue = CoreUtils.atomicGetOrUpdate(markersPerTxnTopicPartition, txnTopicPartition, {
// Note that this may get called more than once if threads have a close race while adding new queue.
info(s"Creating new marker queue for txn partition $txnTopicPartition to destination broker ${destination.id}")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed in the doc:

`createValue` may be invoked more than once if multiple threads attempt to insert a key at the same time

This seems ok, but just wanted to call it out

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, this is logged when the value is created, a more precise logic would be to make atomicGetOrUpdate return an indication if the value is actually created (then we'd log it exactly once), but I didn't think it would be worth the complexity -- at worst we'd get a couple logs at the same time and one of them would create the queue. I'll add a comment.

new LinkedBlockingQueue[PendingCompleteTxnAndMarkerEntry]()
})
queue.add(pendingCompleteTxnAndMarker)

if (markersPerTxnTopicPartition.get(txnTopicPartition).orNull != queue) {
// This could happen if the queue got removed concurrently.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do something if we added to a "dead queue"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can see, it shouldn't affect the user visible behavior. It does create an interesting state when the queue is removed in removeMarkersForTxnTopicPartition -- we could have:

  1. [addMarkers] Retrieve queue.
  2. [removeMarkersForTxnTopicPartition] Remove queue.
  3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not removeMarkersForTxn because queue is empty.
  4. [addMarkers] Add markers to the queue.

Now we've effectively removed the markers while transactionsWithPendingMarkers has an entry.

This state could last for a while if the removal happened on unload (and technically the txn id could expire or etc. so this state may stay indefinitely until broker restart), but as soon as real workflow happens on this txn id that sends out markers, the proper entry will be created and the actual functionality will work as expected.

In other words, this race can lead to an orphan entry in transactionsWithPendingMarkers, but it doesn't affect anything (other than leaking a small amount of memory) until the markers are sent, and sending markers will fix it.

// Note that it could create an unexpected state when the queue is removed from
// removeMarkersForTxnTopicPartition, we could have:
//
// 1. [addMarkers] Retrieve queue.
// 2. [removeMarkersForTxnTopicPartition] Remove queue.
// 3. [removeMarkersForTxnTopicPartition] Iterate over queue, but not removeMarkersForTxn because queue is empty.
// 4. [addMarkers] Add markers to the queue.
//
// Now we've effectively removed the markers while transactionsWithPendingMarkers has an entry.
//
// While this could lead to an orphan entry in transactionsWithPendingMarkers, sending new markers
// will fix the state, so it shouldn't impact the state machine operation.
warn(s"Added $pendingCompleteTxnAndMarker to dead queue for txn partition $txnTopicPartition to destination broker ${destination.id}")
}
}

def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[TxnIdAndMarkerEntry]) => B): Unit =
def forEachTxnTopicPartition[B](f:(Int, BlockingQueue[PendingCompleteTxnAndMarkerEntry]) => B): Unit =
markersPerTxnTopicPartition.forKeyValue { (partition, queue) =>
if (!queue.isEmpty) f(partition, queue)
}
Expand Down Expand Up @@ -187,17 +207,21 @@ class TransactionMarkerChannelManager(
// visible for testing
private[transaction] def queueForUnknownBroker = markersQueueForUnknownBroker

private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, txnIdAndMarker: TxnIdAndMarkerEntry): Unit = {
private[transaction] def addMarkersForBroker(broker: Node, txnTopicPartition: Int, pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry): Unit = {
val brokerId = broker.id

// we do not synchronize on the update of the broker node with the enqueuing,
// since even if there is a race condition we will just retry
val brokerRequestQueue = CoreUtils.atomicGetOrUpdate(markersQueuePerBroker, brokerId,
new TxnMarkerQueue(broker))
val brokerRequestQueue = CoreUtils.atomicGetOrUpdate(markersQueuePerBroker, brokerId, {
// Note that this may get called more than once if threads have a close race while adding new queue.
info(s"Creating new marker queue map to destination broker $brokerId")
new TxnMarkerQueue(broker)
})
brokerRequestQueue.destination = broker
brokerRequestQueue.addMarkers(txnTopicPartition, txnIdAndMarker)
brokerRequestQueue.addMarkers(txnTopicPartition, pendingCompleteTxnAndMarker)

trace(s"Added marker ${txnIdAndMarker.txnMarkerEntry} for transactional id ${txnIdAndMarker.txnId} to destination broker $brokerId")
trace(s"Added marker ${pendingCompleteTxnAndMarker.txnMarkerEntry} for transactional id" +
s" ${pendingCompleteTxnAndMarker.pendingCompleteTxn.transactionalId} to destination broker $brokerId")
}

private def retryLogAppends(): Unit = {
Expand All @@ -211,29 +235,28 @@ class TransactionMarkerChannelManager(

override def generateRequests(): util.Collection[RequestAndCompletionHandler] = {
retryLogAppends()
val txnIdAndMarkerEntries: util.List[TxnIdAndMarkerEntry] = new util.ArrayList[TxnIdAndMarkerEntry]()
val pendingCompleteTxnAndMarkerEntries = new util.ArrayList[PendingCompleteTxnAndMarkerEntry]()
markersQueueForUnknownBroker.forEachTxnTopicPartition { case (_, queue) =>
queue.drainTo(txnIdAndMarkerEntries)
queue.drainTo(pendingCompleteTxnAndMarkerEntries)
}

for (txnIdAndMarker: TxnIdAndMarkerEntry <- txnIdAndMarkerEntries.asScala) {
val transactionalId = txnIdAndMarker.txnId
val producerId = txnIdAndMarker.txnMarkerEntry.producerId
val producerEpoch = txnIdAndMarker.txnMarkerEntry.producerEpoch
val txnResult = txnIdAndMarker.txnMarkerEntry.transactionResult
val coordinatorEpoch = txnIdAndMarker.txnMarkerEntry.coordinatorEpoch
val topicPartitions = txnIdAndMarker.txnMarkerEntry.partitions.asScala.toSet
for (pendingCompleteTxnAndMarker: PendingCompleteTxnAndMarkerEntry <- pendingCompleteTxnAndMarkerEntries.asScala) {
val producerId = pendingCompleteTxnAndMarker.txnMarkerEntry.producerId
val producerEpoch = pendingCompleteTxnAndMarker.txnMarkerEntry.producerEpoch
val txnResult = pendingCompleteTxnAndMarker.txnMarkerEntry.transactionResult
val pendingCompleteTxn = pendingCompleteTxnAndMarker.pendingCompleteTxn
val topicPartitions = pendingCompleteTxnAndMarker.txnMarkerEntry.partitions.asScala.toSet

addTxnMarkersToBrokerQueue(transactionalId, producerId, producerEpoch, txnResult, coordinatorEpoch, topicPartitions)
addTxnMarkersToBrokerQueue(producerId, producerEpoch, txnResult, pendingCompleteTxn, topicPartitions)
}

val currentTimeMs = time.milliseconds()
markersQueuePerBroker.values.map { brokerRequestQueue =>
val txnIdAndMarkerEntries = new util.ArrayList[TxnIdAndMarkerEntry]()
val pendingCompleteTxnAndMarkerEntries = new util.ArrayList[PendingCompleteTxnAndMarkerEntry]()
brokerRequestQueue.forEachTxnTopicPartition { case (_, queue) =>
queue.drainTo(txnIdAndMarkerEntries)
queue.drainTo(pendingCompleteTxnAndMarkerEntries)
}
(brokerRequestQueue.destination, txnIdAndMarkerEntries)
(brokerRequestQueue.destination, pendingCompleteTxnAndMarkerEntries)
}.filter { case (_, entries) => !entries.isEmpty }.map { case (node, entries) =>
val markersToSend = entries.asScala.map(_.txnMarkerEntry).asJava
val requestCompletionHandler = new TransactionMarkerRequestCompletionHandler(node.id, txnStateManager, this, entries)
Expand Down Expand Up @@ -300,9 +323,12 @@ class TransactionMarkerChannelManager(
txnMetadata,
newMetadata)

transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn)
addTxnMarkersToBrokerQueue(transactionalId, txnMetadata.producerId,
txnMetadata.producerEpoch, txnResult, coordinatorEpoch, txnMetadata.topicPartitions.toSet)
val prev = transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn)
if (prev != null) {
info(s"Replaced an existing pending complete txn $prev with $pendingCompleteTxn while adding markers to send.")
}
addTxnMarkersToBrokerQueue(txnMetadata.producerId,
txnMetadata.producerEpoch, txnResult, pendingCompleteTxn, txnMetadata.topicPartitions.toSet)
maybeWriteTxnCompletion(transactionalId)
}

Expand Down Expand Up @@ -354,41 +380,42 @@ class TransactionMarkerChannelManager(
txnLogAppend.newMetadata, appendCallback, _ == Errors.COORDINATOR_NOT_AVAILABLE, RequestLocal.NoCaching)
}

def addTxnMarkersToBrokerQueue(transactionalId: String,
producerId: Long,
def addTxnMarkersToBrokerQueue(producerId: Long,
producerEpoch: Short,
result: TransactionResult,
coordinatorEpoch: Int,
pendingCompleteTxn: PendingCompleteTxn,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pendingCompleteTxn has transactional id and coordinator epoch, so we don't need to pass them explicitly.

topicPartitions: immutable.Set[TopicPartition]): Unit = {
val txnTopicPartition = txnStateManager.partitionFor(transactionalId)
val txnTopicPartition = txnStateManager.partitionFor(pendingCompleteTxn.transactionalId)
val partitionsByDestination: immutable.Map[Option[Node], immutable.Set[TopicPartition]] = topicPartitions.groupBy { topicPartition: TopicPartition =>
metadataCache.getPartitionLeaderEndpoint(topicPartition.topic, topicPartition.partition, interBrokerListenerName)
}

val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
for ((broker: Option[Node], topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
broker match {
case Some(brokerNode) =>
val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
val txnIdAndMarker = TxnIdAndMarkerEntry(transactionalId, marker)
val pendingCompleteTxnAndMarker = PendingCompleteTxnAndMarkerEntry(pendingCompleteTxn, marker)

if (brokerNode == Node.noNode) {
// if the leader of the partition is known but node not available, put it into an unknown broker queue
// and let the sender thread to look for its broker and migrate them later
markersQueueForUnknownBroker.addMarkers(txnTopicPartition, txnIdAndMarker)
markersQueueForUnknownBroker.addMarkers(txnTopicPartition, pendingCompleteTxnAndMarker)
} else {
addMarkersForBroker(brokerNode, txnTopicPartition, txnIdAndMarker)
addMarkersForBroker(brokerNode, txnTopicPartition, pendingCompleteTxnAndMarker)
}

case None =>
val transactionalId = pendingCompleteTxn.transactionalId
txnStateManager.getTransactionState(transactionalId) match {
case Left(error) =>
info(s"Encountered $error trying to fetch transaction metadata for $transactionalId with coordinator epoch $coordinatorEpoch; cancel sending markers to its partition leaders")
transactionsWithPendingMarkers.remove(transactionalId)
transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)

case Right(Some(epochAndMetadata)) =>
if (epochAndMetadata.coordinatorEpoch != coordinatorEpoch) {
info(s"The cached metadata has changed to $epochAndMetadata (old coordinator epoch is $coordinatorEpoch) since preparing to send markers; cancel sending markers to its partition leaders")
transactionsWithPendingMarkers.remove(transactionalId)
transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)
} else {
// if the leader of the partition is unknown, skip sending the txn marker since
// the partition is likely to be deleted already
Expand Down Expand Up @@ -419,25 +446,34 @@ class TransactionMarkerChannelManager(

def removeMarkersForTxnTopicPartition(txnTopicPartitionId: Int): Unit = {
markersQueueForUnknownBroker.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
for (entry: TxnIdAndMarkerEntry <- queue.asScala)
removeMarkersForTxnId(entry.txnId)
for (entry <- queue.asScala) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an idea for how many inflight markers we may have at once? Just want to make sure this log message isn't too spammy. I wonder if we could log all the markers in a single log

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In cases I investigated it was a couple dozens or so, but I don't have the precise stats from a large selection of cases. We log messages on every retry of failed marker send, I think this message would be much less spammy than that (these would happen only when partitions are changed).
The disadvantage of having a single log is that it'll get eventually truncated, but these logs can help to see interesting transitions that could help to investigate race conditions related to load / unload.

info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker -1")
removeMarkersForTxn(entry.pendingCompleteTxn)
}
}

markersQueuePerBroker.foreach { case(_, brokerQueue) =>
markersQueuePerBroker.foreach { case(brokerId, brokerQueue) =>
brokerQueue.removeMarkersForTxnTopicPartition(txnTopicPartitionId).foreach { queue =>
for (entry: TxnIdAndMarkerEntry <- queue.asScala)
removeMarkersForTxnId(entry.txnId)
for (entry <- queue.asScala) {
info(s"Removing $entry for txn partition $txnTopicPartitionId to destination broker $brokerId")
removeMarkersForTxn(entry.pendingCompleteTxn)
}
}
}
}

def removeMarkersForTxnId(transactionalId: String): Unit = {
transactionsWithPendingMarkers.remove(transactionalId)
def removeMarkersForTxn(pendingCompleteTxn: PendingCompleteTxn): Unit = {
val transactionalId = pendingCompleteTxn.transactionalId
val removed = transactionsWithPendingMarkers.remove(transactionalId, pendingCompleteTxn)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This the the core change -- use the original pendingCompleteTxn value to remove the entry. The rest of the change is pretty much plumbing so that we can supply the correct pendingCompleteTxn.

if (!removed) {
val current = transactionsWithPendingMarkers.get(transactionalId)
if (current != null) {
info(s"Failed to remove pending marker entry $current trying to remove $pendingCompleteTxn")
}
}
}
}

case class TxnIdAndMarkerEntry(txnId: String, txnMarkerEntry: TxnMarkerEntry)

case class PendingCompleteTxn(transactionalId: String,
coordinatorEpoch: Int,
txnMetadata: TransactionMetadata,
Expand All @@ -451,3 +487,5 @@ case class PendingCompleteTxn(transactionalId: String,
s"newMetadata=$newMetadata)"
}
}

case class PendingCompleteTxnAndMarkerEntry(pendingCompleteTxn: PendingCompleteTxn, txnMarkerEntry: TxnMarkerEntry)
Loading