Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 2 additions & 32 deletions core/src/test/scala/integration/kafka/api/TransactionsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.TopicPartition
Expand Down Expand Up @@ -821,33 +820,6 @@ class TransactionsTest extends IntegrationTestHarness {
assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch)
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array("zk", "kraft"))
def testTransactionsWithCompression(quorum: String): Unit = {
val numRecords = 50
val numProducersWithCompression = 5
val numTransactions = 40
val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()

for (i <- 0 until numProducersWithCompression) {
transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy")
}
createTopic("topic", 100, brokerCount, topicConfig())
transactionalCompressionProducers.foreach(_.initTransactions())

for (i <- 0 until numTransactions) {
transactionalCompressionProducers.foreach(_.beginTransaction())

for (i <- 0 until numRecords) {
transactionalCompressionProducers.foreach(producer =>
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus("topic", null, i.toString, producer.toString, willBeCommitted = true),
new ErrorLoggingCallback("topic", i.toString.getBytes(StandardCharsets.UTF_8), producer.toString.getBytes(StandardCharsets.UTF_8), true))
)
}
transactionalCompressionProducers.foreach(_.commitTransaction())
}
}

private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String,
start: Int, end: Int, willBeCommitted: Boolean): Unit = {
for (i <- start until end) {
Expand Down Expand Up @@ -880,16 +852,14 @@ class TransactionsTest extends IntegrationTestHarness {
transactionTimeoutMs: Long = 60000,
maxBlockMs: Long = 60000,
deliveryTimeoutMs: Int = 120000,
requestTimeoutMs: Int = 30000,
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
requestTimeoutMs: Int = 30000): KafkaProducer[Array[Byte], Array[Byte]] = {
val producer = TestUtils.createTransactionalProducer(
transactionalId,
brokers,
transactionTimeoutMs = transactionTimeoutMs,
maxBlockMs = maxBlockMs,
deliveryTimeoutMs = deliveryTimeoutMs,
requestTimeoutMs = requestTimeoutMs,
compressionType = compressionType
requestTimeoutMs = requestTimeoutMs
)
transactionalProducers += producer
producer
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/unit/kafka/utils/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1896,8 +1896,7 @@ object TestUtils extends Logging {
maxBlockMs: Long = 60000,
deliveryTimeoutMs: Int = 120000,
requestTimeoutMs: Int = 30000,
maxInFlight: Int = 5,
compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = {
maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers))
props.put(ProducerConfig.ACKS_CONFIG, "all")
Expand All @@ -1909,7 +1908,6 @@ object TestUtils extends Logging {
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString)
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlight.toString)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer)
}

Expand Down