From 0fe456b48d93ed24cc59446b79ccfb32694295bc Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 19 Mar 2018 20:04:04 -0700 Subject: [PATCH 1/7] [SPARK-19185] [DSTREAMS] Avoid concurrent use of cached consumers in CachedKafkaConsumer --- .../kafka010/CachedKafkaConsumer.scala | 226 ----------- .../kafka010/KafkaDataConsumer.scala | 381 ++++++++++++++++++ .../spark/streaming/kafka010/KafkaRDD.scala | 19 +- .../kafka010/KafkaDataConsumerSuite.scala | 111 +++++ 4 files changed, 498 insertions(+), 239 deletions(-) delete mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala create mode 100644 external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala create mode 100644 external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala deleted file mode 100644 index aeb8c1dc342b3..0000000000000 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.kafka010 - -import java.{ util => ju } - -import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord, KafkaConsumer } -import org.apache.kafka.common.{ KafkaException, TopicPartition } - -import org.apache.spark.internal.Logging - -/** - * Consumer of single topicpartition, intended for cached reuse. - * Underlying consumer is not threadsafe, so neither is this, - * but processing the same topicpartition and group id in multiple threads is usually bad anyway. - */ -private[kafka010] -class CachedKafkaConsumer[K, V] private( - val groupId: String, - val topic: String, - val partition: Int, - val kafkaParams: ju.Map[String, Object]) extends Logging { - - require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), - "groupId used for cache key must match the groupId in kafkaParams") - - val topicPartition = new TopicPartition(topic, partition) - - protected val consumer = { - val c = new KafkaConsumer[K, V](kafkaParams) - val tps = new ju.ArrayList[TopicPartition]() - tps.add(topicPartition) - c.assign(tps) - c - } - - // TODO if the buffer was kept around as a random-access structure, - // could possibly optimize re-calculating of an RDD in the same batch - protected var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() - protected var nextOffset = -2L - - def close(): Unit = consumer.close() - - /** - * Get the record for the given offset, waiting up to timeout ms if IO is necessary. - * Sequential forward access will use buffers, but random access will be horribly inefficient. - */ - def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { - logDebug(s"Get $groupId $topic $partition nextOffset $nextOffset requested $offset") - if (offset != nextOffset) { - logInfo(s"Initial fetch for $groupId $topic $partition $offset") - seek(offset) - poll(timeout) - } - - if (!buffer.hasNext()) { poll(timeout) } - require(buffer.hasNext(), - s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") - var record = buffer.next() - - if (record.offset != offset) { - logInfo(s"Buffer miss for $groupId $topic $partition $offset") - seek(offset) - poll(timeout) - require(buffer.hasNext(), - s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") - record = buffer.next() - require(record.offset == offset, - s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + - s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + - "spark.streaming.kafka.allowNonConsecutiveOffsets" - ) - } - - nextOffset = offset + 1 - record - } - - /** - * Start a batch on a compacted topic - */ - def compactedStart(offset: Long, timeout: Long): Unit = { - logDebug(s"compacted start $groupId $topic $partition starting $offset") - // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics - if (offset != nextOffset) { - logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") - seek(offset) - poll(timeout) - } - } - - /** - * Get the next record in the batch from a compacted topic. - * Assumes compactedStart has been called first, and ignores gaps. - */ - def compactedNext(timeout: Long): ConsumerRecord[K, V] = { - if (!buffer.hasNext()) { - poll(timeout) - } - require(buffer.hasNext(), - s"Failed to get records for compacted $groupId $topic $partition after polling for $timeout") - val record = buffer.next() - nextOffset = record.offset + 1 - record - } - - /** - * Rewind to previous record in the batch from a compacted topic. - * @throws NoSuchElementException if no previous element - */ - def compactedPrevious(): ConsumerRecord[K, V] = { - buffer.previous() - } - - private def seek(offset: Long): Unit = { - logDebug(s"Seeking to $topicPartition $offset") - consumer.seek(topicPartition, offset) - } - - private def poll(timeout: Long): Unit = { - val p = consumer.poll(timeout) - val r = p.records(topicPartition) - logDebug(s"Polled ${p.partitions()} ${r.size}") - buffer = r.listIterator - } - -} - -private[kafka010] -object CachedKafkaConsumer extends Logging { - - private case class CacheKey(groupId: String, topic: String, partition: Int) - - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null - - /** Must be called before get, once per JVM, to configure the cache. Further calls are ignored */ - def init( - initialCapacity: Int, - maxCapacity: Int, - loadFactor: Float): Unit = CachedKafkaConsumer.synchronized { - if (null == cache) { - logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]]( - initialCapacity, loadFactor, true) { - override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, CachedKafkaConsumer[_, _]]): Boolean = { - if (this.size > maxCapacity) { - try { - entry.getValue.consumer.close() - } catch { - case x: KafkaException => - logError("Error closing oldest Kafka consumer", x) - } - true - } else { - false - } - } - } - } - } - - /** - * Get a cached consumer for groupId, assigned to topic and partition. - * If matching consumer doesn't already exist, will be created using kafkaParams. - */ - def get[K, V]( - groupId: String, - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = - CachedKafkaConsumer.synchronized { - val k = CacheKey(groupId, topic, partition) - val v = cache.get(k) - if (null == v) { - logInfo(s"Cache miss for $k") - logDebug(cache.keySet.toString) - val c = new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) - cache.put(k, c) - c - } else { - // any given topicpartition should have a consistent key and value type - v.asInstanceOf[CachedKafkaConsumer[K, V]] - } - } - - /** - * Get a fresh new instance, unassociated with the global cache. - * Caller is responsible for closing - */ - def getUncached[K, V]( - groupId: String, - topic: String, - partition: Int, - kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer[K, V] = - new CachedKafkaConsumer[K, V](groupId, topic, partition, kafkaParams) - - /** remove consumer for given groupId, topic, and partition, if it exists */ - def remove(groupId: String, topic: String, partition: Int): Unit = { - val k = CacheKey(groupId, topic, partition) - logInfo(s"Removing $k from cache") - val v = CachedKafkaConsumer.synchronized { - cache.remove(k) - } - if (null != v) { - v.close() - logInfo(s"Removed $k from cache") - } - } -} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala new file mode 100644 index 0000000000000..3965bfca0f01b --- /dev/null +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} +import org.apache.kafka.common.{KafkaException, TopicPartition} + +import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging + +private[kafka010] sealed trait KafkaDataConsumer[K, V] { + /** + * Get the record for the given offset if available. + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[K, V] = { + internalConsumer.get(offset, pollTimeoutMs) + } + + /** + * Start a batch on a compacted topic + * + * @param offset the offset to fetch. + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { + internalConsumer.compactedStart(offset, pollTimeoutMs) + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + * + * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { + internalConsumer.compactedNext(pollTimeoutMs) + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { + internalConsumer.compactedPrevious() + } + + /** + * Release this consumer from being further used. Depending on its implementation, + * this consumer will be either finalized, or reset for reuse later. + */ + def release(): Unit + + /** Reference to the internal implementation that this wrapper delegates to */ + protected def internalConsumer: InternalKafkaConsumer[K, V] +} + + +/** + * A wrapper around Kafka's KafkaConsumer. + * This is not for direct use outside this file. + */ +private[kafka010] +class InternalKafkaConsumer[K, V]( + val groupId: String, + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { + + require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), + "groupId used for cache key must match the groupId in kafkaParams") + + @volatile private var consumer = createConsumer + + /** indicates whether this consumer is in use or not */ + @volatile var inUse = true + + /** indicate whether this consumer is going to be stopped in the next release */ + @volatile var markedForClose = false + + // TODO if the buffer was kept around as a random-access structure, + // could possibly optimize re-calculating of an RDD in the same batch + @volatile private var buffer = ju.Collections.emptyListIterator[ConsumerRecord[K, V]]() + @volatile private var nextOffset = InternalKafkaConsumer.UNKNOWN_OFFSET + + override def toString: String = { + "InternalKafkaConsumer(" + + s"hash=${Integer.toHexString(hashCode)}, " + + s"groupId=$groupId, " + + s"topicPartition=$topicPartition)" + } + + /** Create a KafkaConsumer to fetch records for `topicPartition` */ + private def createConsumer: KafkaConsumer[K, V] = { + val c = new KafkaConsumer[K, V](kafkaParams) + val tps = new ju.ArrayList[TopicPartition]() + tps.add(topicPartition) + c.assign(tps) + c + } + + def close(): Unit = consumer.close() + + /** + * Get the record for the given offset, waiting up to timeout ms if IO is necessary. + * Sequential forward access will use buffers, but random access will be horribly inefficient. + */ + def get(offset: Long, timeout: Long): ConsumerRecord[K, V] = { + logDebug(s"Get $groupId $topicPartition nextOffset $nextOffset requested $offset") + if (offset != nextOffset) { + logInfo(s"Initial fetch for $groupId $topicPartition $offset") + seek(offset) + poll(timeout) + } + + if (!buffer.hasNext()) { poll(timeout) } + require(buffer.hasNext(), + s"Failed to get records for $groupId $topicPartition $offset after polling for $timeout") + var record = buffer.next() + + if (record.offset != offset) { + logInfo(s"Buffer miss for $groupId $topicPartition $offset") + seek(offset) + poll(timeout) + require(buffer.hasNext(), + s"Failed to get records for $groupId $topicPartition $offset after polling for $timeout") + record = buffer.next() + require(record.offset == offset, + s"Got wrong record for $groupId $topicPartition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) + } + + nextOffset = offset + 1 + record + } + + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, pollTimeoutMs: Long): Unit = { + logDebug(s"compacted start $groupId $topicPartition starting $offset") + // This seek may not be necessary, but it's hard to tell due to gaps in compacted topics + if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topicPartition $offset") + seek(offset) + poll(pollTimeoutMs) + } + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(pollTimeoutMs: Long): ConsumerRecord[K, V] = { + if (!buffer.hasNext()) { + poll(pollTimeoutMs) + } + require(buffer.hasNext(), + s"Failed to get records for compacted $groupId $topicPartition " + + s"after polling for $pollTimeoutMs") + val record = buffer.next() + nextOffset = record.offset + 1 + record + } + + /** + * Rewind to previous record in the batch from a compacted topic. + * @throws NoSuchElementException if no previous element + */ + def compactedPrevious(): ConsumerRecord[K, V] = { + buffer.previous() + } + + private def seek(offset: Long): Unit = { + logDebug(s"Seeking to $topicPartition $offset") + consumer.seek(topicPartition, offset) + } + + private def poll(timeout: Long): Unit = { + val p = consumer.poll(timeout) + val r = p.records(topicPartition) + logDebug(s"Polled ${p.partitions()} ${r.size}") + buffer = r.listIterator + } + +} + +private[kafka010] +object KafkaDataConsumer extends Logging { + + private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) + extends KafkaDataConsumer[K, V] { + assert(internalConsumer.inUse) // make sure this has been set to true + override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) } + } + + private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) + extends KafkaDataConsumer[K, V] { + override def release(): Unit = { internalConsumer.close() } + } + + private case class CacheKey(groupId: String, topicPartition: TopicPartition) + + // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap + private var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null + + /** + * Must be called before acquire, once per JVM, to configure the cache. + * Further calls are ignored. + * */ + def init( + initialCapacity: Int, + maxCapacity: Int, + loadFactor: Float): Unit = synchronized { + if (null == cache) { + logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") + cache = new ju.LinkedHashMap[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]( + initialCapacity, loadFactor, true) { + override def removeEldestEntry( + entry: ju.Map.Entry[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]): Boolean = { + if (this.size > maxCapacity) { + try { + entry.getValue.asScala.foreach { _.close() } + } catch { + case x: KafkaException => + logError("Error closing oldest Kafka consumer", x) + } + true + } else { + false + } + } + } + } + } + + /** + * Get a cached consumer for groupId, assigned to topic and partition. + * If matching consumer doesn't already exist, will be created using kafkaParams. + * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. + * + * Note: This method guarantees that the consumer returned is not currently in use by anyone + * else. Within this guarantee, this method will make a best effort attempt to re-use consumers by + * caching them and tracking when they are in use. + */ + def acquire[K, V]( + groupId: String, + topicPartition: TopicPartition, + kafkaParams: ju.Map[String, Object], + context: TaskContext, + useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { + val key = new CacheKey(groupId, topicPartition) + val existingInternalConsumers = Option(cache.get(key)) + .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]]) + + cache.putIfAbsent(key, existingInternalConsumers) + + lazy val newInternalConsumer = new InternalKafkaConsumer[K, V]( + groupId, topicPartition, kafkaParams) + + if (context != null && context.attemptNumber >= 1) { + // If this is reattempt at running the task, then invalidate cached consumers if any and + // start with a new one. + logDebug("Reattempt detected, invalidating cached consumers") + val closedExistingInternalConsumers = new ju.LinkedList[InternalKafkaConsumer[_, _]]() + existingInternalConsumers.asScala.foreach { existingInternalConsumer => + // Consumer exists in cache. If its in use, mark it for closing later, or close it now. + if (existingInternalConsumer.inUse) { + existingInternalConsumer.markedForClose = true + } else { + existingInternalConsumer.close() + closedExistingInternalConsumers.add(existingInternalConsumer) + } + } + existingInternalConsumers.removeAll(closedExistingInternalConsumers) + + logDebug("Reattempt detected, new cached consumer will be allocated " + + s"$newInternalConsumer") + newInternalConsumer.inUse = true + existingInternalConsumers.add(newInternalConsumer) + CachedKafkaDataConsumer(newInternalConsumer) + + } else if (!useCache) { + // If consumer reuse turned off, then do not use it, return a new consumer + logDebug("Cache usage turned off, new non-cached consumer will be allocated " + + s"$newInternalConsumer") + newInternalConsumer.inUse = true + NonCachedKafkaDataConsumer(newInternalConsumer) + + } else if (existingInternalConsumers.isEmpty) { + // If no consumer already cached, then put a new one into the cache and return it + logDebug("No cached consumer, new cached consumer will be allocated " + + s"$newInternalConsumer") + newInternalConsumer.inUse = true + existingInternalConsumers.add(newInternalConsumer) + CachedKafkaDataConsumer(newInternalConsumer) + + } else { + // If consumers are already cached find a currently not used + existingInternalConsumers.asScala.find(!_.inUse) match { + // If found a currently not used, then return that consumer + case Some(existingInternalConsumer) => + logDebug("Not used cached consumer found, re-using it " + + s"$existingInternalConsumer") + existingInternalConsumer.inUse = true + // Any given TopicPartition should have a consistent key and value type + CachedKafkaDataConsumer( + existingInternalConsumer.asInstanceOf[InternalKafkaConsumer[K, V]]) + case None => + // If every consumer is currently used, return a new consumer + logDebug("All cached consumers used, new cached consumer will be allocated " + + s"$newInternalConsumer") + newInternalConsumer.inUse = true + existingInternalConsumers.add(newInternalConsumer) + CachedKafkaDataConsumer(newInternalConsumer) + } + } + } + + private def release(internalConsumer: InternalKafkaConsumer[_, _]): Unit = synchronized { + // Clear the consumer from the cache if this is indeed the consumer present in the cache + val key = new CacheKey(internalConsumer.groupId, internalConsumer.topicPartition) + Option(cache.get(key)) match { + case Some(existingInternalConsumers) => + existingInternalConsumers.asScala.find(_.eq(internalConsumer)) match { + case Some(existingInternalConsumer) => + // The released consumer is the same object as the cached one. + if (existingInternalConsumer.markedForClose) { + logDebug(s"Consumer marked for close, closing it $existingInternalConsumer") + existingInternalConsumer.close() + existingInternalConsumers.remove(existingInternalConsumer) + } else { + logDebug("Consumer not marked for close, put back to cache " + + s"$existingInternalConsumer") + existingInternalConsumer.inUse = false + } + case None => + // The released consumer is either not the same one as in the cache, or not in the cache + // at all. This may happen if the cache was invalidate while this consumer was being + // used. Just close this consumer. + internalConsumer.close() + logWarning("Released a supposedly cached consumer that was not found in the " + + s"cache $internalConsumer") + } + case None => + // The consumer list is not even initialized. This may happen when no consumer acquired for + // a specific groupId and topicPartition. This should normally not happen. + // Just close this consumer. + internalConsumer.close() + logWarning("Released a supposedly cached consumer that was not found in the cache " + + s"because consumer list not allocated $internalConsumer") + } + } +} + +private[kafka010] object InternalKafkaConsumer { + private val UNKNOWN_OFFSET = -2L +} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 07239eda64d2e..3b4733e5eee17 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -19,8 +19,6 @@ package org.apache.spark.streaming.kafka010 import java.{ util => ju } -import scala.collection.mutable.ArrayBuffer - import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import org.apache.kafka.common.TopicPartition @@ -243,22 +241,17 @@ private class KafkaRDDIterator[K, V]( context.addTaskCompletionListener(_ => closeIfNeeded()) - val consumer = if (useConsumerCache) { - CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - if (context.attemptNumber >= 1) { - // just in case the prior attempt failures were cache related - CachedKafkaConsumer.remove(groupId, part.topic, part.partition) - } - CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) - } else { - CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) + val consumer = { + KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) + KafkaDataConsumer.acquire[K, V]( + groupId, part.topicPartition(), kafkaParams, context, useConsumerCache) } var requestOffset = part.fromOffset def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { - consumer.close() + if (consumer != null) { + consumer.release() } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala new file mode 100644 index 0000000000000..a08342376f397 --- /dev/null +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kafka010 + +import java.util.concurrent.{Executors, TimeUnit} + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.kafka.clients.consumer.ConsumerConfig._ +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark._ + +class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { + + private var testUtils: KafkaTestUtils = _ + + override def beforeAll { + super.beforeAll() + testUtils = new KafkaTestUtils + testUtils.setup() + } + + override def afterAll { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } + + test("concurrent use of KafkaDataConsumer") { + KafkaDataConsumer.init(16, 64, 0.75f) + + val topic = "topic" + Random.nextInt() + val data = (1 to 1000).map(_.toString) + val topicPartition = new TopicPartition(topic, 0) + testUtils.createTopic(topic) + testUtils.sendMessages(topic, data.toArray) + + val groupId = "groupId" + val kafkaParams = Map[String, Object]( + GROUP_ID_CONFIG -> groupId, + BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, + KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + AUTO_OFFSET_RESET_CONFIG -> "earliest", + ENABLE_AUTO_COMMIT_CONFIG -> "false" + ) + + val numThreads = 100 + val numConsumerUsages = 500 + + @volatile var error: Throwable = null + + def consume(i: Int): Unit = { + val useCache = Random.nextBoolean + val taskContext = if (Random.nextBoolean) { + new TaskContextImpl(0, 0, 0, 0, attemptNumber = Random.nextInt(2), null, null, null) + } else { + null + } + val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( + groupId, topicPartition, kafkaParams.asJava, taskContext, useCache) + try { + val rcvd = 0 until data.length map { offset => + val bytes = consumer.get(offset, 10000).value() + new String(bytes) + } + assert(rcvd == data) + } catch { + case e: Throwable => + error = e + throw e + } finally { + consumer.release() + } + } + + val threadPool = Executors.newFixedThreadPool(numThreads) + try { + val futures = (1 to numConsumerUsages).map { i => + threadPool.submit(new Runnable { + override def run(): Unit = { consume(i) } + }) + } + futures.foreach(_.get(1, TimeUnit.MINUTES)) + assert(error == null) + } finally { + threadPool.shutdown() + } + } +} From d776289c06d28951bfef78e9eaa81e3a464c9fc4 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 12 Apr 2018 16:07:49 +0200 Subject: [PATCH 2/7] Review item fixes part1 --- .../kafka010/KafkaDataConsumer.scala | 56 ++++++++----------- .../spark/streaming/kafka010/KafkaRDD.scala | 5 +- .../kafka010/KafkaDataConsumerSuite.scala | 4 +- 3 files changed, 27 insertions(+), 38 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 3965bfca0f01b..dbc7c16bd8612 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -82,22 +82,20 @@ private[kafka010] sealed trait KafkaDataConsumer[K, V] { * A wrapper around Kafka's KafkaConsumer. * This is not for direct use outside this file. */ -private[kafka010] -class InternalKafkaConsumer[K, V]( - val groupId: String, - val topicPartition: TopicPartition, - val kafkaParams: ju.Map[String, Object]) extends Logging { +private class InternalKafkaConsumer[K, V]( + val topicPartition: TopicPartition, + val kafkaParams: ju.Map[String, Object]) extends Logging { - require(groupId == kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG), - "groupId used for cache key must match the groupId in kafkaParams") + private[kafka010] val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) + .asInstanceOf[String] - @volatile private var consumer = createConsumer + private val consumer = createConsumer /** indicates whether this consumer is in use or not */ - @volatile var inUse = true + var inUse = true /** indicate whether this consumer is going to be stopped in the next release */ - @volatile var markedForClose = false + var markedForClose = false // TODO if the buffer was kept around as a random-access structure, // could possibly optimize re-calculating of an RDD in the same batch @@ -114,9 +112,8 @@ class InternalKafkaConsumer[K, V]( /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[K, V] = { val c = new KafkaConsumer[K, V](kafkaParams) - val tps = new ju.ArrayList[TopicPartition]() - tps.add(topicPartition) - c.assign(tps) + val topics = ju.Arrays.asList(topicPartition) + c.assign(topics) c } @@ -134,7 +131,9 @@ class InternalKafkaConsumer[K, V]( poll(timeout) } - if (!buffer.hasNext()) { poll(timeout) } + if (!buffer.hasNext()) { + poll(timeout) + } require(buffer.hasNext(), s"Failed to get records for $groupId $topicPartition $offset after polling for $timeout") var record = buffer.next() @@ -213,19 +212,19 @@ object KafkaDataConsumer extends Logging { private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { - assert(internalConsumer.inUse) // make sure this has been set to true - override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) } + assert(internalConsumer.inUse) + override def release(): Unit = KafkaDataConsumer.release(internalConsumer) } private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { - override def release(): Unit = { internalConsumer.close() } + override def release(): Unit = internalConsumer.close() } private case class CacheKey(groupId: String, topicPartition: TopicPartition) // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null + private[kafka010] var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null /** * Must be called before acquire, once per JVM, to configure the cache. @@ -240,10 +239,10 @@ object KafkaDataConsumer extends Logging { cache = new ju.LinkedHashMap[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]( initialCapacity, loadFactor, true) { override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]): Boolean = { + entry: ju.Map.Entry[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]): Boolean = { if (this.size > maxCapacity) { try { - entry.getValue.asScala.foreach { _.close() } + entry.getValue.asScala.foreach(_.close()) } catch { case x: KafkaException => logError("Error closing oldest Kafka consumer", x) @@ -267,27 +266,27 @@ object KafkaDataConsumer extends Logging { * caching them and tracking when they are in use. */ def acquire[K, V]( - groupId: String, topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object], context: TaskContext, useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] val key = new CacheKey(groupId, topicPartition) val existingInternalConsumers = Option(cache.get(key)) .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]]) cache.putIfAbsent(key, existingInternalConsumers) - lazy val newInternalConsumer = new InternalKafkaConsumer[K, V]( - groupId, topicPartition, kafkaParams) + lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](topicPartition, kafkaParams) if (context != null && context.attemptNumber >= 1) { // If this is reattempt at running the task, then invalidate cached consumers if any and - // start with a new one. + // start with a new one. If prior attempt failures were cache related then this way old + // problematic consumers can be removed. logDebug("Reattempt detected, invalidating cached consumers") val closedExistingInternalConsumers = new ju.LinkedList[InternalKafkaConsumer[_, _]]() existingInternalConsumers.asScala.foreach { existingInternalConsumer => - // Consumer exists in cache. If its in use, mark it for closing later, or close it now. + // Consumer exists in cache. If it's in use, mark it for closing later, or close it now. if (existingInternalConsumer.inUse) { existingInternalConsumer.markedForClose = true } else { @@ -299,25 +298,19 @@ object KafkaDataConsumer extends Logging { logDebug("Reattempt detected, new cached consumer will be allocated " + s"$newInternalConsumer") - newInternalConsumer.inUse = true existingInternalConsumers.add(newInternalConsumer) CachedKafkaDataConsumer(newInternalConsumer) - } else if (!useCache) { // If consumer reuse turned off, then do not use it, return a new consumer logDebug("Cache usage turned off, new non-cached consumer will be allocated " + s"$newInternalConsumer") - newInternalConsumer.inUse = true NonCachedKafkaDataConsumer(newInternalConsumer) - } else if (existingInternalConsumers.isEmpty) { // If no consumer already cached, then put a new one into the cache and return it logDebug("No cached consumer, new cached consumer will be allocated " + s"$newInternalConsumer") - newInternalConsumer.inUse = true existingInternalConsumers.add(newInternalConsumer) CachedKafkaDataConsumer(newInternalConsumer) - } else { // If consumers are already cached find a currently not used existingInternalConsumers.asScala.find(!_.inUse) match { @@ -333,7 +326,6 @@ object KafkaDataConsumer extends Logging { // If every consumer is currently used, return a new consumer logDebug("All cached consumers used, new cached consumer will be allocated " + s"$newInternalConsumer") - newInternalConsumer.inUse = true existingInternalConsumers.add(newInternalConsumer) CachedKafkaDataConsumer(newInternalConsumer) } diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala index 3b4733e5eee17..81abc9860bfc3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala @@ -237,14 +237,11 @@ private class KafkaRDDIterator[K, V]( cacheLoadFactor: Float ) extends Iterator[ConsumerRecord[K, V]] { - val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] - context.addTaskCompletionListener(_ => closeIfNeeded()) val consumer = { KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) - KafkaDataConsumer.acquire[K, V]( - groupId, part.topicPartition(), kafkaParams, context, useConsumerCache) + KafkaDataConsumer.acquire[K, V](part.topicPartition(), kafkaParams, context, useConsumerCache) } var requestOffset = part.fromOffset diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index a08342376f397..54e95de3260df 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -79,9 +79,9 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { null } val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( - groupId, topicPartition, kafkaParams.asJava, taskContext, useCache) + topicPartition, kafkaParams.asJava, taskContext, useCache) try { - val rcvd = 0 until data.length map { offset => + val rcvd = (0 until data.length).map { offset => val bytes = consumer.get(offset, 10000).value() new String(bytes) } From 250ad928b6c4893c4bbf0faf1cb0a84e8c152567 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 12 Apr 2018 17:00:15 +0200 Subject: [PATCH 3/7] Compile fix --- .../org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index dbc7c16bd8612..1543b112f8410 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -224,7 +224,7 @@ object KafkaDataConsumer extends Logging { private case class CacheKey(groupId: String, topicPartition: TopicPartition) // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private[kafka010] var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null + private var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null /** * Must be called before acquire, once per JVM, to configure the cache. From 215339db6f29601102465c7f16ae744236683e9e Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 13 Apr 2018 09:59:44 +0200 Subject: [PATCH 4/7] Add reuse test --- .../kafka010/KafkaDataConsumer.scala | 13 ++-- .../kafka010/KafkaDataConsumerSuite.scala | 63 +++++++++++++------ 2 files changed, 50 insertions(+), 26 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 1543b112f8410..ab8009ef61c38 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -74,7 +74,7 @@ private[kafka010] sealed trait KafkaDataConsumer[K, V] { def release(): Unit /** Reference to the internal implementation that this wrapper delegates to */ - protected def internalConsumer: InternalKafkaConsumer[K, V] + private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V] } @@ -82,7 +82,7 @@ private[kafka010] sealed trait KafkaDataConsumer[K, V] { * A wrapper around Kafka's KafkaConsumer. * This is not for direct use outside this file. */ -private class InternalKafkaConsumer[K, V]( +private[kafka010] class InternalKafkaConsumer[K, V]( val topicPartition: TopicPartition, val kafkaParams: ju.Map[String, Object]) extends Logging { @@ -207,8 +207,9 @@ private class InternalKafkaConsumer[K, V]( } -private[kafka010] -object KafkaDataConsumer extends Logging { +private[kafka010] case class CacheKey(groupId: String, topicPartition: TopicPartition) + +private[kafka010] object KafkaDataConsumer extends Logging { private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { @@ -221,10 +222,8 @@ object KafkaDataConsumer extends Logging { override def release(): Unit = internalConsumer.close() } - private case class CacheKey(groupId: String, topicPartition: TopicPartition) - // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null + private[kafka010] var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null /** * Must be called before acquire, once per JVM, to configure the cache. diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index 54e95de3260df..2fe8313ef9618 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -25,21 +25,24 @@ import scala.util.Random import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark._ -class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { - +class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach { private var testUtils: KafkaTestUtils = _ + private val topic = "topic" + Random.nextInt() + private val topicPartition = new TopicPartition(topic, 0) + private val groupId = "groupId" - override def beforeAll { + override def beforeAll(): Unit = { super.beforeAll() testUtils = new KafkaTestUtils testUtils.setup() + KafkaDataConsumer.init(16, 64, 0.75f) } - override def afterAll { + override def afterAll(): Unit = { if (testUtils != null) { testUtils.teardown() testUtils = null @@ -47,24 +50,46 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { super.afterAll() } - test("concurrent use of KafkaDataConsumer") { - KafkaDataConsumer.init(16, 64, 0.75f) + override def afterEach(): Unit = { + super.afterEach() + KafkaDataConsumer.cache.clear() + } + + private def getKafkaParams() = Map[String, Object]( + GROUP_ID_CONFIG -> groupId, + BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, + KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, + AUTO_OFFSET_RESET_CONFIG -> "earliest", + ENABLE_AUTO_COMMIT_CONFIG -> "false" + ).asJava + + test("KafkaDataConsumer reuse in case of same groupId and TopicPartition") { + val kafkaParams = getKafkaParams() + + val consumer1 = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( + topicPartition, kafkaParams, null, true) + consumer1.release() + + val consumer2 = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( + topicPartition, kafkaParams, null, true) + consumer2.release() + + assert(KafkaDataConsumer.cache.size() == 1) + val key = new CacheKey(groupId, topicPartition) + val existingInternalConsumers = KafkaDataConsumer.cache.get(key) + assert(existingInternalConsumers.size() == 1) + val existingInternalConsumer = existingInternalConsumers.get(0) + assert(existingInternalConsumer.eq(consumer1.internalConsumer)) + assert(existingInternalConsumer.eq(consumer2.internalConsumer)) + } - val topic = "topic" + Random.nextInt() + test("concurrent use of KafkaDataConsumer") { val data = (1 to 1000).map(_.toString) - val topicPartition = new TopicPartition(topic, 0) testUtils.createTopic(topic) testUtils.sendMessages(topic, data.toArray) - val groupId = "groupId" - val kafkaParams = Map[String, Object]( - GROUP_ID_CONFIG -> groupId, - BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, - KEY_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, - VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[ByteArrayDeserializer].getName, - AUTO_OFFSET_RESET_CONFIG -> "earliest", - ENABLE_AUTO_COMMIT_CONFIG -> "false" - ) + val kafkaParams = getKafkaParams() val numThreads = 100 val numConsumerUsages = 500 @@ -79,7 +104,7 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { null } val consumer = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( - topicPartition, kafkaParams.asJava, taskContext, useCache) + topicPartition, kafkaParams, taskContext, useCache) try { val rcvd = (0 until data.length).map { offset => val bytes = consumer.get(offset, 10000).value() From 7aa32578950476e7d409be9ba461623e47f4714d Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Fri, 13 Apr 2018 22:00:31 +0200 Subject: [PATCH 5/7] Reuse test fix --- .../streaming/kafka010/KafkaDataConsumerSuite.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index 2fe8313ef9618..5fd77e13e04bb 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -25,11 +25,11 @@ import scala.util.Random import org.apache.kafka.clients.consumer.ConsumerConfig._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.ByteArrayDeserializer -import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.BeforeAndAfterAll import org.apache.spark._ -class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach { +class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { private var testUtils: KafkaTestUtils = _ private val topic = "topic" + Random.nextInt() private val topicPartition = new TopicPartition(topic, 0) @@ -50,11 +50,6 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll with B super.afterAll() } - override def afterEach(): Unit = { - super.afterEach() - KafkaDataConsumer.cache.clear() - } - private def getKafkaParams() = Map[String, Object]( GROUP_ID_CONFIG -> groupId, BOOTSTRAP_SERVERS_CONFIG -> testUtils.brokerAddress, @@ -65,6 +60,8 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll with B ).asJava test("KafkaDataConsumer reuse in case of same groupId and TopicPartition") { + KafkaDataConsumer.cache.clear() + val kafkaParams = getKafkaParams() val consumer1 = KafkaDataConsumer.acquire[Array[Byte], Array[Byte]]( From 2c453883869921c99024c02f0a29aac395c82341 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Sat, 21 Apr 2018 11:28:32 +0200 Subject: [PATCH 6/7] Switching back to single cached consumer --- .../kafka010/KafkaDataConsumer.scala | 125 ++++++++---------- .../kafka010/KafkaDataConsumerSuite.scala | 4 +- 2 files changed, 57 insertions(+), 72 deletions(-) diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index ab8009ef61c38..8f07d24345352 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -19,8 +19,6 @@ package org.apache.spark.streaming.kafka010 import java.{util => ju} -import scala.collection.JavaConverters._ - import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.{KafkaException, TopicPartition} @@ -223,7 +221,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { } // Don't want to depend on guava, don't want a cleanup thread, use a simple LinkedHashMap - private[kafka010] var cache: ju.Map[CacheKey, ju.List[InternalKafkaConsumer[_, _]]] = null + private[kafka010] var cache: ju.Map[CacheKey, InternalKafkaConsumer[_, _]] = null /** * Must be called before acquire, once per JVM, to configure the cache. @@ -235,13 +233,26 @@ private[kafka010] object KafkaDataConsumer extends Logging { loadFactor: Float): Unit = synchronized { if (null == cache) { logInfo(s"Initializing cache $initialCapacity $maxCapacity $loadFactor") - cache = new ju.LinkedHashMap[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]( + cache = new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer[_, _]]( initialCapacity, loadFactor, true) { override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, ju.List[InternalKafkaConsumer[_, _]]]): Boolean = { - if (this.size > maxCapacity) { - try { - entry.getValue.asScala.foreach(_.close()) + entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer[_, _]]): Boolean = { + + // Try to remove the least-used entry if its currently not in use. + // + // If you cannot remove it, then the cache will keep growing. In the worst case, + // the cache will grow to the max number of concurrent tasks that can run in the executor, + // (that is, number of tasks slots) after which it will never reduce. This is unlikely to + // be a serious problem because an executor with more than 64 (default) tasks slots is + // likely running on a beefy machine that can handle a large number of simultaneously + // active consumers. + + if (entry.getValue.inUse == false && this.size > maxCapacity) { + logWarning( + s"KafkaConsumer cache hitting max capacity of $maxCapacity, " + + s"removing consumer for ${entry.getKey}") + try { + entry.getValue.close() } catch { case x: KafkaException => logError("Error closing oldest Kafka consumer", x) @@ -271,10 +282,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] val key = new CacheKey(groupId, topicPartition) - val existingInternalConsumers = Option(cache.get(key)) - .getOrElse(new ju.LinkedList[InternalKafkaConsumer[_, _]]) - - cache.putIfAbsent(key, existingInternalConsumers) + val existingInternalConsumer = cache.get(key) lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](topicPartition, kafkaParams) @@ -282,87 +290,66 @@ private[kafka010] object KafkaDataConsumer extends Logging { // If this is reattempt at running the task, then invalidate cached consumers if any and // start with a new one. If prior attempt failures were cache related then this way old // problematic consumers can be removed. - logDebug("Reattempt detected, invalidating cached consumers") - val closedExistingInternalConsumers = new ju.LinkedList[InternalKafkaConsumer[_, _]]() - existingInternalConsumers.asScala.foreach { existingInternalConsumer => - // Consumer exists in cache. If it's in use, mark it for closing later, or close it now. + logDebug(s"Reattempt detected, invalidating cached consumer $existingInternalConsumer") + if (existingInternalConsumer != null) { + // Consumer exists in cache. If its in use, mark it for closing later, or close it now. if (existingInternalConsumer.inUse) { existingInternalConsumer.markedForClose = true } else { existingInternalConsumer.close() - closedExistingInternalConsumers.add(existingInternalConsumer) + // Remove the consumer from cache only if it's closed. + // Marked for close consumers will be removed in release function. + cache.remove(key) } } - existingInternalConsumers.removeAll(closedExistingInternalConsumers) - logDebug("Reattempt detected, new cached consumer will be allocated " + + logDebug("Reattempt detected, new non-cached consumer will be allocated " + s"$newInternalConsumer") - existingInternalConsumers.add(newInternalConsumer) - CachedKafkaDataConsumer(newInternalConsumer) + NonCachedKafkaDataConsumer(newInternalConsumer) } else if (!useCache) { // If consumer reuse turned off, then do not use it, return a new consumer logDebug("Cache usage turned off, new non-cached consumer will be allocated " + s"$newInternalConsumer") NonCachedKafkaDataConsumer(newInternalConsumer) - } else if (existingInternalConsumers.isEmpty) { - // If no consumer already cached, then put a new one into the cache and return it + } else if (existingInternalConsumer == null) { + // If consumer is not already cached, then put a new in the cache and return it logDebug("No cached consumer, new cached consumer will be allocated " + s"$newInternalConsumer") - existingInternalConsumers.add(newInternalConsumer) + cache.put(key, newInternalConsumer) CachedKafkaDataConsumer(newInternalConsumer) + } else if (existingInternalConsumer.inUse) { + // If consumer is already cached but is currently in use, then return a new consumer + logDebug("Used cached consumer found, new non-cached consumer will be allocated " + + s"$newInternalConsumer") + NonCachedKafkaDataConsumer(newInternalConsumer) } else { - // If consumers are already cached find a currently not used - existingInternalConsumers.asScala.find(!_.inUse) match { - // If found a currently not used, then return that consumer - case Some(existingInternalConsumer) => - logDebug("Not used cached consumer found, re-using it " + - s"$existingInternalConsumer") - existingInternalConsumer.inUse = true - // Any given TopicPartition should have a consistent key and value type - CachedKafkaDataConsumer( - existingInternalConsumer.asInstanceOf[InternalKafkaConsumer[K, V]]) - case None => - // If every consumer is currently used, return a new consumer - logDebug("All cached consumers used, new cached consumer will be allocated " + - s"$newInternalConsumer") - existingInternalConsumers.add(newInternalConsumer) - CachedKafkaDataConsumer(newInternalConsumer) - } + // If consumer is already cached and is currently not in use, then return that consumer + logDebug(s"Not used cached consumer found, re-using it $existingInternalConsumer") + existingInternalConsumer.inUse = true + // Any given TopicPartition should have a consistent key and value type + CachedKafkaDataConsumer(existingInternalConsumer.asInstanceOf[InternalKafkaConsumer[K, V]]) } } private def release(internalConsumer: InternalKafkaConsumer[_, _]): Unit = synchronized { // Clear the consumer from the cache if this is indeed the consumer present in the cache val key = new CacheKey(internalConsumer.groupId, internalConsumer.topicPartition) - Option(cache.get(key)) match { - case Some(existingInternalConsumers) => - existingInternalConsumers.asScala.find(_.eq(internalConsumer)) match { - case Some(existingInternalConsumer) => - // The released consumer is the same object as the cached one. - if (existingInternalConsumer.markedForClose) { - logDebug(s"Consumer marked for close, closing it $existingInternalConsumer") - existingInternalConsumer.close() - existingInternalConsumers.remove(existingInternalConsumer) - } else { - logDebug("Consumer not marked for close, put back to cache " + - s"$existingInternalConsumer") - existingInternalConsumer.inUse = false - } - case None => - // The released consumer is either not the same one as in the cache, or not in the cache - // at all. This may happen if the cache was invalidate while this consumer was being - // used. Just close this consumer. - internalConsumer.close() - logWarning("Released a supposedly cached consumer that was not found in the " + - s"cache $internalConsumer") - } - case None => - // The consumer list is not even initialized. This may happen when no consumer acquired for - // a specific groupId and topicPartition. This should normally not happen. - // Just close this consumer. + val cachedInternalConsumer = cache.get(key) + if (internalConsumer.eq(cachedInternalConsumer)) { + // The released consumer is the same object as the cached one. + if (internalConsumer.markedForClose) { internalConsumer.close() - logWarning("Released a supposedly cached consumer that was not found in the cache " + - s"because consumer list not allocated $internalConsumer") + cache.remove(key) + } else { + internalConsumer.inUse = false + } + } else { + // The released consumer is either not the same one as in the cache, or not in the cache + // at all. This may happen if the cache was invalidate while this consumer was being used. + // Just close this consumer. + internalConsumer.close() + logInfo(s"Released a supposedly cached consumer that was not found in the cache " + + s"$internalConsumer") } } } diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala index 5fd77e13e04bb..d934c64962adb 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumerSuite.scala @@ -74,9 +74,7 @@ class KafkaDataConsumerSuite extends SparkFunSuite with BeforeAndAfterAll { assert(KafkaDataConsumer.cache.size() == 1) val key = new CacheKey(groupId, topicPartition) - val existingInternalConsumers = KafkaDataConsumer.cache.get(key) - assert(existingInternalConsumers.size() == 1) - val existingInternalConsumer = existingInternalConsumers.get(0) + val existingInternalConsumer = KafkaDataConsumer.cache.get(key) assert(existingInternalConsumer.eq(consumer1.internalConsumer)) assert(existingInternalConsumer.eq(consumer2.internalConsumer)) } From 6cd67c6ac7b948eb791cc4871477ab0b1df4fcad Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Wed, 2 May 2018 12:27:06 +0200 Subject: [PATCH 7/7] Minor nit fixes --- .../org/apache/spark/sql/kafka010/KafkaDataConsumer.scala | 2 +- .../apache/spark/streaming/kafka010/KafkaDataConsumer.scala | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 48508d057a540..941f0ab177e48 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -395,7 +395,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { // likely running on a beefy machine that can handle a large number of simultaneously // active consumers. - if (entry.getValue.inUse == false && this.size > capacity) { + if (!entry.getValue.inUse && this.size > capacity) { logWarning( s"KafkaConsumer cache hitting max capacity of $capacity, " + s"removing consumer for ${entry.getKey}") diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 8f07d24345352..68c5fe9ab066a 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -72,7 +72,7 @@ private[kafka010] sealed trait KafkaDataConsumer[K, V] { def release(): Unit /** Reference to the internal implementation that this wrapper delegates to */ - private[kafka010] def internalConsumer: InternalKafkaConsumer[K, V] + def internalConsumer: InternalKafkaConsumer[K, V] } @@ -226,7 +226,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { /** * Must be called before acquire, once per JVM, to configure the cache. * Further calls are ignored. - * */ + */ def init( initialCapacity: Int, maxCapacity: Int, @@ -249,7 +249,7 @@ private[kafka010] object KafkaDataConsumer extends Logging { if (entry.getValue.inUse == false && this.size > maxCapacity) { logWarning( - s"KafkaConsumer cache hitting max capacity of $maxCapacity, " + + s"KafkaConsumer cache hitting max capacity of $maxCapacity, " + s"removing consumer for ${entry.getKey}") try { entry.getValue.close()