Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,7 @@ project(':transaction-coordinator') {
implementation libs.jacksonDatabind
implementation project(':clients')
implementation project(':server-common')
implementation project(':coordinator-common')
implementation libs.slf4jApi

testImplementation libs.junitJupiter
Expand Down Expand Up @@ -2262,7 +2263,6 @@ project(':storage') {
implementation project(':storage:storage-api')
implementation project(':server-common')
implementation project(':clients')
implementation project(':transaction-coordinator')
implementation(libs.caffeine) {
exclude group: 'org.checkerframework', module: 'checker-qual'
}
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-transaction-coordinator.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<subpackage name="transaction">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.coordinator.common.runtime" />
<allow pkg="org.apache.kafka.coordinator.transaction" />
<allow pkg="org.apache.kafka.common" />
<allow pkg="org.slf4j" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.coordinator.transaction
import java.nio.ByteBuffer
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
import org.apache.kafka.common.record.{Record, RecordBatch}
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.server.common.TransactionVersion
Expand Down Expand Up @@ -143,33 +143,6 @@ object TransactionLog {
} else throw new IllegalStateException(s"Unknown version $version from the transaction log message value")
}
}

/**
* Exposed for printing records using [[kafka.tools.DumpLogSegments]]
*/
def formatRecordKeyAndValue(record: Record): (Option[String], Option[String]) = {
TransactionLog.readTxnRecordKey(record.key) match {
case txnKey: TxnKey =>
val keyString = s"transaction_metadata::transactionalId=${txnKey.transactionalId}"

val valueString = TransactionLog.readTxnRecordValue(txnKey.transactionalId, record.value) match {
case None => "<DELETE>"

case Some(txnMetadata) => s"producerId:${txnMetadata.producerId}," +
s"producerEpoch:${txnMetadata.producerEpoch}," +
s"state=${txnMetadata.state}," +
s"partitions=${txnMetadata.topicPartitions.mkString("[", ",", "]")}," +
s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
}

(Some(keyString), Some(valueString))

case unknownKey: UnknownKey =>
(Some(s"unknown::version=${unknownKey.version}"), None)
}
}

}

sealed trait BaseKey{
Expand Down
59 changes: 56 additions & 3 deletions core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode

import java.io._
import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, ObjectNode, TextNode}
import kafka.coordinator.transaction.TransactionLog
import kafka.log._
import kafka.utils.CoreUtils
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
Expand All @@ -42,6 +41,8 @@ import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownReco
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotKeyJsonConverter, ShareSnapshotValue, ShareSnapshotValueJsonConverter, ShareUpdateKey, ShareUpdateKeyJsonConverter, ShareUpdateValue, ShareUpdateValueJsonConverter}
import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogKeyJsonConverter, TransactionLogValue, TransactionLogValueJsonConverter}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.snapshot.Snapshots
Expand Down Expand Up @@ -584,9 +585,61 @@ object DumpLogSegments {
}
}

private class TransactionLogMessageParser extends MessageParser[String, String] {
// Package private for testing.
class TransactionLogMessageParser extends MessageParser[String, String] {
private val serde = new TransactionCoordinatorRecordSerde()

private def prepareKey(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: TransactionLogKey =>
TransactionLogKeyJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}

val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}

private def prepareValue(message: Message, version: Short): String = {
val messageAsJson = message match {
case m: TransactionLogValue =>
TransactionLogValueJsonConverter.write(m, version)
case _ => throw new UnknownRecordTypeException(version)
}

val json = new ObjectNode(JsonNodeFactory.instance)
json.set("type", new TextNode(version.toString))
json.set("data", messageAsJson)
json.toString
}

override def parse(record: Record): (Option[String], Option[String]) = {
TransactionLog.formatRecordKeyAndValue(record)
if (!record.hasKey)
throw new RuntimeException(s"Failed to decode message at offset ${record.offset} using offset " +
"transaction-log decoder (message had a missing key)")

try {
val r = serde.deserialize(record.key, record.value)
(
Some(prepareKey(r.key.message, r.key.version)),
Option(r.value).map(v => prepareValue(v.message, v.version)).orElse(Some("<DELETE>"))
)
} catch {
case e: UnknownRecordTypeException =>
(
Some(s"Unknown record type ${e.unknownType} at offset ${record.offset}, skipping."),
None
)

case e: Throwable =>
e.printStackTrace()
(
Some(s"Error at offset ${record.offset}, skipping. ${e.getMessage}"),
None
)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package kafka.coordinator.transaction


import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.protocol.{ByteBufferAccessor, MessageUtil}
Expand Down Expand Up @@ -110,40 +109,6 @@ class TransactionLogTest {
assertEquals(pidMappings.size, count)
}

@Test
def testTransactionMetadataParsing(): Unit = {
val transactionalId = "id"
val producerId = 1334L
val topicPartition = new TopicPartition("topic", 0)

val txnMetadata = new TransactionMetadata(transactionalId, producerId, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_ID, producerEpoch,
RecordBatch.NO_PRODUCER_EPOCH, transactionTimeoutMs, Ongoing, collection.mutable.Set.empty[TopicPartition], 0, 0, TV_0)
txnMetadata.addPartitions(Set(topicPartition))

val keyBytes = TransactionLog.keyToBytes(transactionalId)
val valueBytes = TransactionLog.valueToBytes(txnMetadata.prepareNoTransit(), TV_2)
val transactionMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(keyBytes, valueBytes)
)).records.asScala.head

val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
assertEquals(Some(s"producerId:$producerId,producerEpoch:$producerEpoch,state=Ongoing," +
s"partitions=[$topicPartition],txnLastUpdateTimestamp=0,txnTimeoutMs=$transactionTimeoutMs"), valueStringOpt)
}

@Test
def testTransactionMetadataTombstoneParsing(): Unit = {
val transactionalId = "id"
val transactionMetadataRecord = TestUtils.records(Seq(
new SimpleRecord(TransactionLog.keyToBytes(transactionalId), null)
)).records.asScala.head

val (keyStringOpt, valueStringOpt) = TransactionLog.formatRecordKeyAndValue(transactionMetadataRecord)
assertEquals(Some(s"transaction_metadata::transactionalId=$transactionalId"), keyStringOpt)
assertEquals(Some("<DELETE>"), valueStringOpt)
}

@Test
def testSerializeTransactionLogValueToHighestNonFlexibleVersion(): Unit = {
val txnTransitMetadata = TxnTransitMetadata(1, 1, 1, 1, 1, 1000, CompleteCommit, Set.empty, 500, 500, TV_0)
Expand Down
125 changes: 123 additions & 2 deletions core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.stream.IntStream
import kafka.log.{LogTestUtils, UnifiedLog}
import kafka.raft.{KafkaMetadataLog, MetadataLogConfig}
import kafka.server.KafkaRaftServer
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors}
import kafka.tools.DumpLogSegments.{OffsetsMessageParser, ShareGroupStateMessageParser, TimeIndexDumpErrors, TransactionLogMessageParser}
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.{Assignment, Subscription}
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
Expand All @@ -43,7 +43,8 @@ import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.group.generated.{ConsumerGroupMemberMetadataValue, ConsumerGroupMetadataKey, ConsumerGroupMetadataValue, GroupMetadataKey, GroupMetadataValue}
import org.apache.kafka.coordinator.share.generated.{ShareSnapshotKey, ShareSnapshotValue, ShareUpdateKey, ShareUpdateValue}
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.coordinator.transaction.generated.{TransactionLogKey, TransactionLogValue}
import org.apache.kafka.coordinator.transaction.{TransactionCoordinatorRecordSerde, TransactionLogConfig}
import org.apache.kafka.metadata.MetadataRecordSerde
import org.apache.kafka.raft.{KafkaRaftClient, OffsetAndEpoch, VoterSetTest}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion}
Expand Down Expand Up @@ -831,6 +832,126 @@ class DumpLogSegmentsTest {
)
}

@Test
def testTransactionLogMessageParser(): Unit = {
val serde = new TransactionCoordinatorRecordSerde()
val parser = new TransactionLogMessageParser()

def serializedRecord(key: ApiMessageAndVersion, value: ApiMessageAndVersion): Record = {
val record = new CoordinatorRecord(key, value)
TestUtils.singletonRecords(
key = serde.serializeKey(record),
value = serde.serializeValue(record)
).records.iterator.next
}

// The key is mandatory.
assertEquals(
"Failed to decode message at offset 0 using offset transaction-log decoder (message had a missing key)",
assertThrows(
classOf[RuntimeException],
() => parser.parse(TestUtils.singletonRecords(key = null, value = null).records.iterator.next)
).getMessage
)

// A valid key and value should work.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we ought to have some testing of a more complicated value too. There are tagged fields and so on to consider.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a testcase with all fields set for TransactionValue. Is this what you had in mind?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will do. The point is to catch a situation where we inadvertently break this.

assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("{\"type\":\"0\",\"data\":{\"producerId\":123,\"producerEpoch\":0,\"transactionTimeoutMs\":0," +
"\"transactionStatus\":0,\"transactionPartitions\":[],\"transactionLastUpdateTimestampMs\":0," +
"\"transactionStartTimestampMs\":0}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
new ApiMessageAndVersion(
new TransactionLogValue()
.setProducerId(123L),
0.toShort
)
))
)

// A valid key with a tombstone should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("<DELETE>")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
null
))
)

// An unknown record type should be handled and reported as such.
assertEquals(
(
Some("Unknown record type 32767 at offset 0, skipping."),
None
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
Short.MaxValue // Invalid record id.
),
new ApiMessageAndVersion(
new TransactionLogValue(),
0.toShort
)
))
)

// A valid key and value with all fields set should work.
assertEquals(
(
Some("{\"type\":\"0\",\"data\":{\"transactionalId\":\"txnId\"}}"),
Some("{\"type\":\"1\",\"data\":{\"producerId\":12,\"previousProducerId\":11,\"nextProducerId\":10," +
"\"producerEpoch\":2,\"transactionTimeoutMs\":14,\"transactionStatus\":0," +
"\"transactionPartitions\":[{\"topic\":\"topic1\",\"partitionIds\":[0,1,2]}," +
"{\"topic\":\"topic2\",\"partitionIds\":[3,4,5]}],\"transactionLastUpdateTimestampMs\":123," +
"\"transactionStartTimestampMs\":13}}")
),
parser.parse(serializedRecord(
new ApiMessageAndVersion(
new TransactionLogKey()
.setTransactionalId("txnId"),
0.toShort
),
new ApiMessageAndVersion(
new TransactionLogValue()
.setClientTransactionVersion(0.toShort)
.setNextProducerId(10L)
.setPreviousProducerId(11L)
.setProducerEpoch(2.toShort)
.setProducerId(12L)
.setTransactionLastUpdateTimestampMs(123L)
.setTransactionPartitions(List(
new TransactionLogValue.PartitionsSchema()
.setTopic("topic1")
.setPartitionIds(List(0, 1, 2).map(Integer.valueOf).asJava),
new TransactionLogValue.PartitionsSchema()
.setTopic("topic2")
.setPartitionIds(List(3, 4, 5).map(Integer.valueOf).asJava)
).asJava)
.setTransactionStartTimestampMs(13L)
.setTransactionStatus(0)
.setTransactionTimeoutMs(14),
1.toShort
)
))
)
}

private def readBatchMetadata(lines: util.ListIterator[String]): Option[String] = {
while (lines.hasNext) {
val line = lines.next()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.server.util.MockScheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
Expand Down Expand Up @@ -785,7 +784,7 @@ private ProducerStateManager newProducerStateManager() throws IOException {
topicPartition,
logDir,
(int) (Duration.ofMinutes(5).toMillis()),
new ProducerStateManagerConfig(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false),
new ProducerStateManagerConfig(86400000, false),
new MockTime()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
import static org.apache.kafka.coordinator.transaction.TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_DEFAULT;
import static org.apache.kafka.storage.internals.log.ProducerStateManager.LATE_TRANSACTION_BUFFER_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -89,7 +88,7 @@ public class ProducerStateManagerTest {
public ProducerStateManagerTest() throws IOException {
logDir = TestUtils.tempDirectory();
partition = new TopicPartition("test", 0);
producerStateManagerConfig = new ProducerStateManagerConfig(PRODUCER_ID_EXPIRATION_MS_DEFAULT, true);
producerStateManagerConfig = new ProducerStateManagerConfig(86400000, true);
time = new MockTime();
stateManager = new ProducerStateManager(partition, logDir, maxTransactionTimeoutMs,
producerStateManagerConfig, time);
Expand Down
Loading