Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
// uncommitted record since last poll. Using one second as poll's timeout ensures that
// offsetCommitIntervalMs, of value greater than 1 second, does not see delays in offset
// commit.
recordIter = consumer.poll(Duration.ofSeconds(1)).iterator
recordIter = consumer.poll(Duration.ofSeconds(1L)).iterator
if (!recordIter.hasNext)
throw new NoRecordsException
}
Expand Down Expand Up @@ -387,7 +387,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
}

def close(timeout: Long) {
this.producer.close(timeout, TimeUnit.MILLISECONDS)
this.producer.close(Duration.ofMillis(timeout))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package kafka.api

import java.util
import java.{time, util}
import java.util.{Collections, Properties}
import java.util.Arrays.asList
import java.util.concurrent.{ExecutionException, TimeUnit}
Expand Down Expand Up @@ -1066,11 +1066,11 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
val topics = Seq("mytopic", "mytopic2")
val newTopics = topics.map(new NewTopic(_, 1, 1))
val future = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
client.close(2, TimeUnit.HOURS)
client.close(time.Duration.ofHours(2))
val future2 = client.createTopics(newTopics.asJava, new CreateTopicsOptions().validateOnly(true)).all()
assertFutureExceptionTypeEquals(future2, classOf[TimeoutException])
future.get
client.close(30, TimeUnit.MINUTES) // multiple close-with-timeout should have no effect
client.close(time.Duration.ofMinutes(30)) // multiple close-with-timeout should have no effect
}

/**
Expand All @@ -1086,7 +1086,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
// cancelled by the close operation.
val future = client.createTopics(Seq("mytopic", "mytopic2").map(new NewTopic(_, 1, 1)).asJava,
new CreateTopicsOptions().timeoutMs(900000)).all()
client.close(0, TimeUnit.MILLISECONDS)
client.close(time.Duration.ZERO)
assertFutureExceptionTypeEquals(future, classOf[TimeoutException])
}

Expand Down Expand Up @@ -1164,7 +1164,7 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
override def run {
consumer.subscribe(Collections.singleton(testTopicName))
while (true) {
consumer.poll(5000)
consumer.poll(time.Duration.ofSeconds(5L))
consumer.commitSync()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package kafka.api

import java.time.Duration
import java.nio.charset.StandardCharsets
import java.util.Properties
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -193,7 +194,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
s"value$i".getBytes(StandardCharsets.UTF_8))
producer.send(record)
}
producer.close(timeoutMs, TimeUnit.MILLISECONDS)
producer.close(Duration.ofMillis(timeoutMs))
val lastOffset = futures.foldLeft(0) { (offset, future) =>
val recordMetadata = future.get
assertEquals(topic, recordMetadata.topic)
Expand Down Expand Up @@ -248,7 +249,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
s"value$i".getBytes(StandardCharsets.UTF_8))
(record, producer.send(record, callback))
}
producer.close(20000L, TimeUnit.MILLISECONDS)
producer.close(Duration.ofSeconds(20L))
recordAndFutures.foreach { case (record, future) =>
val recordMetadata = future.get
if (timestampType == TimestampType.LOG_APPEND_TIME)
Expand Down Expand Up @@ -445,7 +446,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val producer = createProducer(brokerList, lingerMs = Int.MaxValue)
val responses = (0 until numRecords) map (_ => producer.send(record0))
assertTrue("No request is complete.", responses.forall(!_.isDone()))
producer.close(0, TimeUnit.MILLISECONDS)
producer.close(Duration.ZERO)
responses.foreach { future =>
try {
future.get()
Expand All @@ -454,7 +455,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
case e: ExecutionException => assertEquals(classOf[KafkaException], e.getCause.getClass)
}
}
assertEquals("Fetch response should have no message returned.", 0, consumer.poll(50).count)
assertEquals("Fetch response should have no message returned.", 0, consumer.poll(Duration.ofMillis(50L)).count)
}
}

Expand All @@ -476,9 +477,9 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
if (sendRecords)
(0 until numRecords) foreach (_ => producer.send(record))
// The close call will be called by all the message callbacks. This tests idempotence of the close call.
producer.close(0, TimeUnit.MILLISECONDS)
producer.close(Duration.ZERO)
// Test close with non zero timeout. Should not block at all.
producer.close(Long.MaxValue, TimeUnit.MICROSECONDS)
producer.close()
}
}
for (i <- 0 until 50) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package kafka.api

import java.time.Duration
import java.util.{Collections, HashMap, Properties}

import kafka.api.QuotaTestClients._
Expand Down Expand Up @@ -140,7 +141,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
val endTimeMs = System.currentTimeMillis + 10000
var throttled = false
while ((!throttled || quotaTestClients.exemptRequestMetric == null) && System.currentTimeMillis < endTimeMs) {
consumer.poll(100)
consumer.poll(Duration.ofMillis(100L))
val throttleMetric = quotaTestClients.throttleMetric(QuotaType.Request, consumerClientId)
throttled = throttleMetric != null && metricValue(throttleMetric) > 0
}
Expand Down Expand Up @@ -197,7 +198,7 @@ abstract class QuotaTestClients(topic: String,
var numConsumed = 0
var throttled = false
do {
numConsumed += consumer.poll(100).count
numConsumed += consumer.poll(Duration.ofMillis(100L)).count
val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
throttled = metric != null && metricValue(metric) > 0
} while (numConsumed < maxRecords && !throttled)
Expand All @@ -206,7 +207,7 @@ abstract class QuotaTestClients(topic: String,
if (throttled && numConsumed < maxRecords && waitForRequestCompletion) {
val minRecords = numConsumed + 1
while (numConsumed < minRecords)
numConsumed += consumer.poll(100).count
numConsumed += consumer.poll(Duration.ofMillis(100L)).count
}
numConsumed
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
executor.schedule(new Runnable {
def run() = createTopic(newtopic, numPartitions = numBrokers, replicationFactor = numBrokers)
}, 2, TimeUnit.SECONDS)
consumer.poll(0)
consumer.poll(time.Duration.ZERO)

val producer = createProducer()

Expand Down Expand Up @@ -481,14 +481,15 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
revokeSemaphore.foreach(s => s.release())
}
})
consumer.poll(0)
// requires to used deprecated `poll(long)` to trigger metadata update
consumer.poll(0L)
}, 0)
}

def waitForRebalance(timeoutMs: Long, future: Future[Any], otherConsumers: KafkaConsumer[Array[Byte], Array[Byte]]*) {
val startMs = System.currentTimeMillis
while (System.currentTimeMillis < startMs + timeoutMs && !future.isDone)
otherConsumers.foreach(consumer => consumer.poll(100))
otherConsumers.foreach(consumer => consumer.poll(time.Duration.ofMillis(100L)))
assertTrue("Rebalance did not complete in time", future.isDone)
}

Expand Down Expand Up @@ -569,7 +570,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
val closeGraceTimeMs = 2000
val startNanos = System.nanoTime
info("Closing consumer with timeout " + closeTimeoutMs + " ms.")
consumer.close(closeTimeoutMs, TimeUnit.MILLISECONDS)
consumer.close(time.Duration.ofMillis(closeTimeoutMs))
val timeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startNanos)
maxCloseTimeMs.foreach { ms =>
assertTrue("Close took too long " + timeTakenMs, timeTakenMs < ms + closeGraceTimeMs)
Expand All @@ -592,7 +593,7 @@ class ConsumerBounceTest extends BaseRequestTest with Logging {
}
def onPartitionsRevoked(partitions: Collection[TopicPartition]) {
}})
consumer.poll(3000)
consumer.poll(time.Duration.ofSeconds(3L))
assertTrue("Assignment did not complete on time", assignSemaphore.tryAcquire(1, TimeUnit.SECONDS))
if (committedRecords > 0)
assertEquals(committedRecords, consumer.committed(tp).offset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ abstract class IntegrationTestHarness extends KafkaServerTestHarness {

@After
override def tearDown() {
producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
producers.foreach(_.close(Duration.ZERO))
consumers.foreach(_.wakeup())
consumers.foreach(_.close(Duration.ZERO))
producers.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package kafka.api

import java.lang.{Long => JLong}
import java.time.Duration
import java.util.{Optional, Properties}
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -578,7 +579,7 @@ class TransactionsTest extends KafkaServerTestHarness {
try {
producer.commitTransaction()
} finally {
producer.close(0, TimeUnit.MILLISECONDS)
producer.close(Duration.ZERO)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
clientThreads.foreach(_.initiateShutdown())
clientThreads.foreach(_.join(5 * 1000))
executors.foreach(_.shutdownNow())
producers.foreach(_.close(0, TimeUnit.MILLISECONDS))
producers.foreach(_.close(Duration.ZERO))
consumers.foreach(_.close(Duration.ofMillis(0)))
adminClients.foreach(_.close())
TestUtils.shutdownServers(servers)
Expand Down Expand Up @@ -1470,7 +1470,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
override def doWork(): Unit = {
try {
while (isRunning || (lastReceived != producerThread.lastSent && System.currentTimeMillis < endTimeMs)) {
val records = consumer.poll(50)
val records = consumer.poll(Duration.ofMillis(50L))
received += records.count
if (!records.isEmpty) {
lastBatch = records
Expand Down