-
Notifications
You must be signed in to change notification settings - Fork 5
Description
Background
We use kafka consumers to obtain the beginning and end offsets.
Such consumers are not thread-safe and can not be used in multithreaded environments like inside Futures in scala without any consequences.
Creating a new short-lived kafka consumer for single offset requests would impose significant overhead. So, for now, we are using the LRU cache provided by the spring framework to reduce such overhead.
However, because of the lack of thread safety of kafka consumers, newly created consumers must be bound to a single thread and are not reusable by others. This leads to writing blocking code that happens to run in a separate thread.
And even then, can we prevent race conditions? Then again, getting offsets might be resistant to these race conditions. It would need further research.
Feature
Kafka service should provide an asynchronous interface for getting offsets for a particular topic.
Example
trait KafkaService {
def getBeginningOffsets(topic: String, consumerProperties: Properties): Future[Map[Int, Long]]
def getEndOffset(topic: String, consumerProperties: Properties): Future[Map[Int, Long]]
def getBeginningEndOffsets(topic: String, consumerProperties: Properties): Future[BeginningEndOffsets]
}
class KafkaServiceImpl @Inject() (...) extends KafkaService {
private val consumerPool: KafkaConsumerPool = createConsumerPool(config.consumerPoolConfig)
override def getBeginningOffset(topic: String, consumerProperties: Properties): Future[Map[Int, Long]] =
for {
consumer <- consumerPool.acquire(consumerProperties)
offsets <- beginningOffsetOf(consumer, topic)
_ <- consumerPool.release(consumer)
} yield offsets
private def beginningOffsetOf(topic: String)(consumer: Consumer[Any, Any]) =
for {
partitions <- asyncParitionsFor(topic)(consumer)
offsets <- asyncBeginningOffsetsFor(partitions)(consumer)
} yield offsets
private def asyncParititonsFor(topic: String)(consumer: Consumer[Any, Any]): Future[Seq[TopicPartition]] = ???
private def asyncBeginningOffsetFor(partitions: Seq[TopicPartition])(consumer: Consumer[Any, Any]): Future[Map[Int, Long]] = ???
}
class KafkaConsumerPool(...) {
def acquire(properties: Properties): Future[Consumer[Any, Any]] = ???
def release(consumer: Consumer[Any, Any]): Future[Unit] = ???
}Proposed Solution
Solution Ideas:
- We can find a client library that does not rely on the kafka consumer and can be shared between threads for requesting topic offsets of a particular kafka cluster. One such library might be https://docs.spring.io/spring-kafka/reference/html/
- Research if getting offsets via kafka consumer is susceptible to race conditions.
- Implement consumer pool as discussed in Feature/789 show messages to ingest on hyperdrive jobs #791 (comment) with @jozefbakus. Here is an article on implementing such a pool https://medium.com/@sutanu.dalui/kafka-consumer-pooling-cc8e9dfc4d24. However, it would still need adaptation to our needs.