Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.io.Source
import scala.jdk.CollectionConverters._

import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.zk.KafkaZkClient
Expand All @@ -39,7 +40,6 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
Expand Down Expand Up @@ -597,7 +597,7 @@ class KafkaTestUtils(
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(partitionState.leader) &&
Request.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random

import kafka.log.{LogCleaner, UnifiedLog}
import kafka.server.BrokerTopicStats
import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark._
Expand Down Expand Up @@ -92,13 +90,13 @@ class KafkaRDDSuite extends SparkFunSuite {
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
val logDirFailureChannel = new LogDirFailureChannel(1)
val topicPartition = new TopicPartition(topic, partition)
val producerIdExpirationMs = Int.MaxValue
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false)
val logConfig = new LogConfig(logProps)
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs)
val logConfig = LogConfig(logProps)
val log = UnifiedLog(
dir,
logConfig,
Expand All @@ -122,7 +120,7 @@ class KafkaRDDSuite extends SparkFunSuite {
log.roll()
logs.put(topicPartition, log)

val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel)
val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel)
cleaner.startup()
cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{Time => KTime}
import org.apache.zookeeper.client.ZKClientConfig
Expand Down Expand Up @@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
val leader = partitionState.leader
val isr = partitionState.isr
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
Request.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks

import java.util.concurrent.{ScheduledFuture, TimeUnit}

import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.Scheduler
import org.jmock.lib.concurrent.DeterministicScheduler

/**
Expand All @@ -42,6 +42,8 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

val scheduler = new DeterministicScheduler()

def isStarted: Boolean = true

def startup(): Unit = {}

def shutdown(): Unit = synchronized {
Expand All @@ -54,18 +56,17 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

def schedule(
name: String,
task: Runnable,
delayMs: Long = 0,
periodMs: Long = -1): ScheduledFuture[_] = synchronized {
if (periodMs >= 0) {
scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS)
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
val runnable = new Runnable {
override def run(): Unit = fun()
}
if (period >= 0) {
scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
} else {
scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
scheduler.schedule(runnable, delay, unit)
}
}

override def resizeThreadPool(i: Int): Unit = {

}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>3.6.0</kafka.version>
<kafka.version>3.4.1</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.13.1</parquet.version>
Expand Down