From 6252724b77e23d480f1d5fd359efc284df78dc23 Mon Sep 17 00:00:00 2001 From: Lucas Brutschy Date: Sun, 17 Dec 2023 10:26:45 +0100 Subject: [PATCH] Revert "KAFKA-15764: Missing Tests for Transactions (#14702)" This reverts commit ed7ad6d9d3d38940fbeb13324a186d3e5edd539e. --- .../kafka/api/TransactionsTest.scala | 34 ++----------------- .../scala/unit/kafka/utils/TestUtils.scala | 4 +-- 2 files changed, 3 insertions(+), 35 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala index cfacf0f39e587..10e1e2311deec 100644 --- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala +++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala @@ -26,7 +26,6 @@ import kafka.server.KafkaConfig import kafka.utils.{TestInfoUtils, TestUtils} import kafka.utils.TestUtils.{consumeRecords, waitUntilTrue} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerGroupMetadata, OffsetAndMetadata} -import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException} import org.apache.kafka.common.TopicPartition @@ -821,33 +820,6 @@ class TransactionsTest extends IntegrationTestHarness { assertEquals((initialProducerEpoch + 1).toShort, producerStateEntry.producerEpoch) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) - @ValueSource(strings = Array("zk", "kraft")) - def testTransactionsWithCompression(quorum: String): Unit = { - val numRecords = 50 - val numProducersWithCompression = 5 - val numTransactions = 40 - val transactionalCompressionProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - - for (i <- 0 until numProducersWithCompression) { - transactionalCompressionProducers += createTransactionalProducer("transactional-compression-producer-" + i.toString, compressionType = "snappy") - } - createTopic("topic", 100, brokerCount, topicConfig()) - transactionalCompressionProducers.foreach(_.initTransactions()) - - for (i <- 0 until numTransactions) { - transactionalCompressionProducers.foreach(_.beginTransaction()) - - for (i <- 0 until numRecords) { - transactionalCompressionProducers.foreach(producer => - producer.send(TestUtils.producerRecordWithExpectedTransactionStatus("topic", null, i.toString, producer.toString, willBeCommitted = true), - new ErrorLoggingCallback("topic", i.toString.getBytes(StandardCharsets.UTF_8), producer.toString.getBytes(StandardCharsets.UTF_8), true)) - ) - } - transactionalCompressionProducers.foreach(_.commitTransaction()) - } - } - private def sendTransactionalMessagesWithValueRange(producer: KafkaProducer[Array[Byte], Array[Byte]], topic: String, start: Int, end: Int, willBeCommitted: Boolean): Unit = { for (i <- start until end) { @@ -880,16 +852,14 @@ class TransactionsTest extends IntegrationTestHarness { transactionTimeoutMs: Long = 60000, maxBlockMs: Long = 60000, deliveryTimeoutMs: Int = 120000, - requestTimeoutMs: Int = 30000, - compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = { + requestTimeoutMs: Int = 30000): KafkaProducer[Array[Byte], Array[Byte]] = { val producer = TestUtils.createTransactionalProducer( transactionalId, brokers, transactionTimeoutMs = transactionTimeoutMs, maxBlockMs = maxBlockMs, deliveryTimeoutMs = deliveryTimeoutMs, - requestTimeoutMs = requestTimeoutMs, - compressionType = compressionType + requestTimeoutMs = requestTimeoutMs ) transactionalProducers += producer producer diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a52d3b5e27987..7887753a9d126 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1896,8 +1896,7 @@ object TestUtils extends Logging { maxBlockMs: Long = 60000, deliveryTimeoutMs: Int = 120000, requestTimeoutMs: Int = 30000, - maxInFlight: Int = 5, - compressionType: String = "none"): KafkaProducer[Array[Byte], Array[Byte]] = { + maxInFlight: Int = 5): KafkaProducer[Array[Byte], Array[Byte]] = { val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, plaintextBootstrapServers(brokers)) props.put(ProducerConfig.ACKS_CONFIG, "all") @@ -1909,7 +1908,6 @@ object TestUtils extends Logging { props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs.toString) props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs.toString) props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, maxInFlight.toString) - props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType) new KafkaProducer[Array[Byte], Array[Byte]](props, new ByteArraySerializer, new ByteArraySerializer) }