From a25b93353a6ab4e637a77fd38c8449a0653a0e50 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Sun, 15 Oct 2023 22:53:56 +0800 Subject: [PATCH] Revert "[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0" This reverts commit d1bd21a2a219ebe6c5ac3fcb1e17db75af3c670c. --- .../spark/sql/kafka010/KafkaTestUtils.scala | 4 +-- .../streaming/kafka010/KafkaRDDSuite.scala | 16 ++++++------ .../streaming/kafka010/KafkaTestUtils.scala | 4 +-- .../kafka010/mocks/MockScheduler.scala | 25 ++++++++++--------- pom.xml | 2 +- 5 files changed, 25 insertions(+), 26 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 2b0c13ed443d4..c54afc6290b13 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -28,6 +28,7 @@ import scala.io.Source import scala.jdk.CollectionConverters._ import com.google.common.io.Files +import kafka.api.Request import kafka.server.{HostedPartition, KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.zk.KafkaZkClient @@ -39,7 +40,6 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT} import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.SystemTime @@ -597,7 +597,7 @@ class KafkaTestUtils( .getPartitionInfo(topic, partition) match { case Some(partitionState) => zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && - FetchRequest.isValidBrokerId(partitionState.leader) && + Request.isValidBrokerId(partitionState.leader) && !partitionState.replicas.isEmpty case _ => diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index ae941b1fddd52..735ec2f7b4484 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -24,14 +24,12 @@ import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ import scala.util.Random -import kafka.log.{LogCleaner, UnifiedLog} -import kafka.server.BrokerTopicStats +import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog} +import kafka.server.{BrokerTopicStats, LogDirFailureChannel} import kafka.utils.Pool import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord} import org.apache.kafka.common.serialization.StringDeserializer -import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig} import org.scalatest.concurrent.Eventually.{eventually, interval, timeout} import org.apache.spark._ @@ -92,13 +90,13 @@ class KafkaRDDSuite extends SparkFunSuite { val dir = new File(logDir, topic + "-" + partition) dir.mkdirs() val logProps = new ju.Properties() - logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) - logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f)) + logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) + logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f)) val logDirFailureChannel = new LogDirFailureChannel(1) val topicPartition = new TopicPartition(topic, partition) val producerIdExpirationMs = Int.MaxValue - val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false) - val logConfig = new LogConfig(logProps) + val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs) + val logConfig = LogConfig(logProps) val log = UnifiedLog( dir, logConfig, @@ -122,7 +120,7 @@ class KafkaRDDSuite extends SparkFunSuite { log.roll() logs.put(topicPartition, log) - val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel) + val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel) cleaner.startup() cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000) diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 1bd9b8bc31600..6a9ef52e990ea 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -27,12 +27,12 @@ import scala.annotation.tailrec import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal +import kafka.api.Request import kafka.server.{KafkaConfig, KafkaServer} import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.requests.FetchRequest import org.apache.kafka.common.serialization.StringSerializer import org.apache.kafka.common.utils.{Time => KTime} import org.apache.zookeeper.client.ZKClientConfig @@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging { val leader = partitionState.leader val isr = partitionState.isr zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined && - FetchRequest.isValidBrokerId(leader) && !isr.isEmpty + Request.isValidBrokerId(leader) && !isr.isEmpty case _ => false } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala index 1b7e92a03604e..c0724909bc350 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/mocks/MockScheduler.scala @@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks import java.util.concurrent.{ScheduledFuture, TimeUnit} +import kafka.utils.Scheduler import org.apache.kafka.common.utils.Time -import org.apache.kafka.server.util.Scheduler import org.jmock.lib.concurrent.DeterministicScheduler /** @@ -42,6 +42,8 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { val scheduler = new DeterministicScheduler() + def isStarted: Boolean = true + def startup(): Unit = {} def shutdown(): Unit = synchronized { @@ -54,18 +56,17 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler { def schedule( name: String, - task: Runnable, - delayMs: Long = 0, - periodMs: Long = -1): ScheduledFuture[_] = synchronized { - if (periodMs >= 0) { - scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS) + fun: () => Unit, + delay: Long = 0, + period: Long = -1, + unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized { + val runnable = new Runnable { + override def run(): Unit = fun() + } + if (period >= 0) { + scheduler.scheduleAtFixedRate(runnable, delay, period, unit) } else { - scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS) + scheduler.schedule(runnable, delay, unit) } } - - override def resizeThreadPool(i: Int): Unit = { - - } - } diff --git a/pom.xml b/pom.xml index 1a6607acfd16b..4741afd1a640f 100644 --- a/pom.xml +++ b/pom.xml @@ -137,7 +137,7 @@ 2.3 - 3.6.0 + 3.4.1 10.14.2.0 1.13.1