diff --git a/README.md b/README.md
index b7d1b7d74..a62ade20e 100644
--- a/README.md
+++ b/README.md
@@ -130,6 +130,10 @@ spring.mail.host=
spring.mail.port=
```
```
+# Kafka Service properties. Used for per-workflow Kafka consumers
+kafka.consumers.cache.size=50
+```
+```
#Kafka sensor properties. Not all are required. Adjust according to your use case.
kafkaSource.group.id.prefix=hyper_drive_${appUniqueId}
kafkaSource.poll.duration=500
diff --git a/pom.xml b/pom.xml
index 1a2fa2d95..3d31474d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,6 +75,7 @@
42.3.3
2.2.0
2.2.0
+ 2.7
2.7.3
2.0.3
2.0.2
@@ -108,6 +109,7 @@
1.7
6.2.0.Final
1.12.29
+ 1.0.3
1.6
@@ -184,6 +186,11 @@
kafka_${scala.compat.version}
${kafka.version}
+
+ org.apache.commons
+ commons-configuration2
+ ${commons.configuration2.version}
+
@@ -243,6 +250,10 @@
org.slf4j
slf4j-log4j12
+
+ commons-beanutils
+ commons-beanutils-core
+
@@ -394,6 +405,12 @@
${embedded.kafka.version}
test
+
+ za.co.absa.commons
+ commons_${scala.compat.version}
+ ${absa.commons.version}
+ test
+
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 171c91f25..55c84d561 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -70,6 +70,9 @@ notification.sender.address=
spring.mail.host=
spring.mail.port=
+# Kafka service properties.
+kafka.consumers.cache.size=50
+
#Kafka sensor properties.
kafkaSource.group.id.prefix=hyper_drive
kafkaSource.poll.duration=500
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala
new file mode 100644
index 000000000..df892a782
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.security.UserGroupInformation
+import org.slf4j.LoggerFactory
+import org.springframework.stereotype.Service
+import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap
+
+import javax.inject.Inject
+import scala.util.Try
+
+trait CheckpointService {
+ type TopicPartitionOffsets = Map[String, Map[Int, Long]]
+
+ def getOffsetsFromFile(path: String)(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]]
+ def getLatestOffsetFilePath(params: HdfsParameters)(implicit
+ ugi: UserGroupInformation
+ ): Try[Option[(String, Boolean)]]
+}
+
+class HdfsParameters(
+ val keytab: String,
+ val principal: String,
+ val checkpointLocation: String
+)
+
+@Service
+class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends CheckpointService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ private val offsetsDirName = "offsets"
+ private val commitsDirName = "commits"
+
+ /**
+ * See org.apache.spark.sql.execution.streaming.HDFSMetadataLog
+ */
+ private val batchFilesFilter = new PathFilter {
+ override def accept(path: Path): Boolean = {
+ try {
+ path.getName.toLong
+ true
+ } catch {
+ case _: NumberFormatException =>
+ false
+ }
+ }
+ }
+
+ override def getOffsetsFromFile(
+ path: String
+ )(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]] = {
+ hdfsService.parseFileAndClose(path, parseKafkaOffsetStream)
+ }
+
+ /**
+ * @return an Option of a String, Boolean pair. The string contains the path to the latest offset file, while the
+ * boolean is true if the offset is committed (i.e. a corresponding commit file exists), and false otherwise.
+ * None is returned if the offset file does not exist. If the offset file does not exist, the corresponding
+ * commit file is assumed to also not exist.
+ */
+ override def getLatestOffsetFilePath(
+ params: HdfsParameters
+ )(implicit ugi: UserGroupInformation): Try[Option[(String, Boolean)]] = {
+ getLatestOffsetBatchId(params.checkpointLocation).flatMap { offsetBatchIdOpt =>
+ val offsetFilePath = offsetBatchIdOpt.map { offsetBatchId =>
+ getLatestCommitBatchId(params.checkpointLocation).map { commitBatchIdOpt =>
+ val committed = commitBatchIdOpt match {
+ case Some(commitBatchId) => offsetBatchId == commitBatchId
+ case None => false
+ }
+ val path = new Path(s"${params.checkpointLocation}/${offsetsDirName}/${offsetBatchId}")
+ (path.toString, committed)
+ }
+ }
+ if (offsetFilePath.isEmpty) {
+ logger.debug(s"No offset files exist under checkpoint location ${params.checkpointLocation}")
+ }
+ swap(offsetFilePath)
+ }
+ }
+
+ /**
+ * see org.apache.spark.sql.execution.streaming.OffsetSeqLog
+ * and org.apache.spark.sql.kafka010.JsonUtils
+ * for details on the assumed format
+ */
+ private def parseKafkaOffsetStream(lines: Iterator[String]): TopicPartitionOffsets = {
+ val SERIALIZED_VOID_OFFSET = "-"
+ def parseOffset(value: String): Option[TopicPartitionOffsets] = value match {
+ case SERIALIZED_VOID_OFFSET => None
+ case json => Some(mapper.readValue(json, classOf[TopicPartitionOffsets]))
+ }
+ if (!lines.hasNext) {
+ throw new IllegalStateException("Incomplete log file")
+ }
+
+ lines.next() // skip version
+ lines.next() // skip metadata
+ lines
+ .map(parseOffset)
+ .filter(_.isDefined)
+ .map(_.get)
+ .toSeq
+ .head
+ }
+
+ private def getLatestCommitBatchId(checkpointDir: String)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
+ val commitsDir = new Path(s"$checkpointDir/$commitsDirName")
+ getLatestBatchId(commitsDir)
+ }
+
+ private def getLatestOffsetBatchId(checkpointDir: String)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
+ val offsetsDir = new Path(s"$checkpointDir/$offsetsDirName")
+ getLatestBatchId(offsetsDir)
+ }
+
+ private def getLatestBatchId(path: Path)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
+ hdfsService.exists(path).flatMap { exists =>
+ if (exists) {
+ hdfsService.listStatus(path, batchFilesFilter).map { statuses =>
+ statuses
+ .map { status =>
+ status.getPath.getName.toLong
+ }
+ .sorted
+ .lastOption
+
+ }
+ } else {
+ logger.debug(s"Could not find path $path")
+ Try(None)
+ }
+ }
+ }
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala
new file mode 100644
index 000000000..b0c8d286b
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsService.scala
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs._
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.slf4j.LoggerFactory
+import org.springframework.stereotype.Service
+
+import java.nio.charset.StandardCharsets.UTF_8
+import java.security.PrivilegedExceptionAction
+import scala.io.Source
+import scala.util.Try
+
+trait HdfsService {
+ def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean]
+ def open(path: Path)(implicit ugi: UserGroupInformation): Try[FSDataInputStream]
+ def listStatus(path: Path, filter: PathFilter)(implicit ugi: UserGroupInformation): Try[Array[FileStatus]]
+ def parseFileAndClose[R](pathStr: String, parseFn: Iterator[String] => R)(implicit
+ ugi: UserGroupInformation
+ ): Try[Option[R]]
+}
+
+@Service
+class HdfsServiceImpl extends HdfsService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private lazy val conf = SparkHadoopUtil.get.conf
+ override def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean] = {
+ Try {
+ doAs {
+ fs.exists(path)
+ }
+ }
+ }
+
+ override def open(path: Path)(implicit ugi: UserGroupInformation): Try[FSDataInputStream] = {
+ Try {
+ doAs {
+ fs.open(path)
+ }
+ }
+ }
+
+ override def listStatus(path: Path, filter: PathFilter)(implicit
+ ugi: UserGroupInformation
+ ): Try[Array[FileStatus]] = {
+ Try {
+ doAs {
+ fs.listStatus(path, filter)
+ }
+ }
+ }
+
+ /**
+ * @param pathStr path to the file as a string
+ * @param parseFn function that parses the file line by line. Caution: It must materialize the content,
+ * because the file is closed after the method completes. E.g. it must not return an iterator.
+ * @tparam R type of the parsed value
+ * @return None if the file doesn't exist, Some with the parsed content
+ */
+ override def parseFileAndClose[R](pathStr: String, parseFn: Iterator[String] => R)(implicit
+ ugi: UserGroupInformation
+ ): Try[Option[R]] = {
+ for {
+ path <- Try(new Path(pathStr))
+ exists <- exists(path)
+ parseResult <-
+ if (exists) {
+ open(path).map { input =>
+ try {
+ val lines = Source.fromInputStream(input, UTF_8.name()).getLines()
+ Some(parseFn(lines))
+ } catch {
+ case e: Exception =>
+ // re-throw the exception with the log file path added
+ throw new Exception(s"Failed to parse file $path", e)
+ } finally {
+ IOUtils.closeQuietly(input)
+ }
+ }
+
+ } else {
+ logger.debug(s"Could not find file $path")
+ Try(None)
+ }
+ } yield parseResult
+ }
+
+ /**
+ * Must not be a lazy val, because different users should get different FileSystems. FileSystem is cached internally.
+ */
+ private def fs = FileSystem.get(conf)
+
+ private def doAs[T](fn: => T)(implicit ugi: UserGroupInformation) = {
+ ugi.doAs(new PrivilegedExceptionAction[T] {
+ override def run(): T = {
+ fn
+ }
+ })
+ }
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala
new file mode 100644
index 000000000..8b7bed734
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.commons.configuration2.builder.BasicConfigurationBuilder
+import org.apache.commons.configuration2.builder.fluent.Parameters
+import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
+import org.apache.commons.configuration2.{BaseConfiguration, Configuration}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.slf4j.LoggerFactory
+import org.springframework.stereotype.Service
+import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
+import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
+import za.co.absa.hyperdrive.trigger.models.{JobInstanceParameters, SparkInstanceParameters}
+
+import java.util.Properties
+import javax.inject.Inject
+import scala.concurrent.{ExecutionContext, Future}
+
+trait HyperdriveOffsetComparisonService {
+ def isNewJobInstanceRequired(jobParameters: JobInstanceParameters)(implicit ec: ExecutionContext): Future[Boolean]
+}
+
+@Service
+class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
+ checkpointService: CheckpointService,
+ userGroupInformationService: UserGroupInformationService,
+ kafkaService: KafkaService
+) extends HyperdriveOffsetComparisonService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private val HyperdriveCheckpointKey = "writer.common.checkpoint.location"
+ private val HyperdriveKafkaTopicKey = "reader.kafka.topic"
+ private val HyperdriveKafkaBrokersKey = "reader.kafka.brokers"
+ private val HyperdriveKafkaExtraOptionsKey = "reader.option.kafka"
+ private val PropertyDelimiter = "="
+ private val ListDelimiter = ','
+ private val defaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"
+
+ /**
+ * @param jobParameters Parameters for the job instance. Should contain at least
+ * - reader.kafka.topic
+ * - reader.kafka.brokers
+ * - writer.common.checkpoint.location
+ * @param ec ExecutionContext
+ * @return - false if the job instance can be skipped. This is determined if the checkpoint offset is equal
+ * to the latest offsets on the kafka topic. A job instance can also be skipped if the kafka topic doesn't exist
+ * or if it's empty.
+ * - true otherwise
+ */
+ def isNewJobInstanceRequired(jobParameters: JobInstanceParameters)(implicit ec: ExecutionContext): Future[Boolean] = {
+ val kafkaParametersOpt = getKafkaParameters(jobParameters)
+ if (kafkaParametersOpt.isEmpty) {
+ logger.debug(s"Kafka parameters were not found in job definition ${jobParameters}")
+ }
+
+ val kafkaEndOffsetsOptFut = Future {
+ kafkaParametersOpt.map { kafkaParameters =>
+ kafkaService.getEndOffsets(kafkaParameters._1, kafkaParameters._2)
+ }
+ }
+
+ val kafkaBeginningOffsetsOptFut = Future {
+ kafkaParametersOpt.map { kafkaParameters =>
+ kafkaService.getBeginningOffsets(kafkaParameters._1, kafkaParameters._2)
+ }
+ }
+
+ val isNewJobInstanceRequiredFut = kafkaEndOffsetsOptFut.flatMap { kafkaEndOffsetsOpt =>
+ kafkaBeginningOffsetsOptFut.flatMap { kafkaBeginningOffsetsOpt =>
+ (kafkaBeginningOffsetsOpt, kafkaEndOffsetsOpt) match {
+ case (Some(kafkaBeginningOffsets), Some(kafkaEndOffsets)) =>
+ if (kafkaBeginningOffsets.isEmpty) {
+ logger.info(s"Topic ${kafkaParametersOpt.get._1} does not exist. Skipping job instance")
+ Future { false }
+ } else if (offsetsEqual(kafkaBeginningOffsets, kafkaEndOffsets)) {
+ logger.info(s"Topic ${kafkaParametersOpt.get._1} is empty. Skipping job instance")
+ Future { false }
+ } else {
+ getCheckpointOffsets(jobParameters, kafkaParametersOpt).map {
+ case Some(checkpointOffsets) =>
+ val allConsumed = offsetsConsumed(checkpointOffsets, kafkaEndOffsets)
+ logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance")
+ !allConsumed
+ case _ => true
+ }
+ }
+ case _ => Future { true }
+ }
+ }
+ }
+
+ isNewJobInstanceRequiredFut.recover { case e: Exception =>
+ logger.warn("An error occurred while getting offsets", e)
+ true
+ }
+ }
+
+ private def getResolvedAppArguments(jobParameters: JobInstanceParameters): Option[Map[String, String]] = {
+ if (!isHyperdriveJob(jobParameters)) {
+ logger.warn(s"Job Parameters ${jobParameters} is not a Hyperdrive Job!")
+ None
+ } else {
+ val sparkParameters = jobParameters.asInstanceOf[SparkInstanceParameters]
+ val args = sparkParameters.appArguments
+ val config = parseConfiguration(args.toArray)
+ import scala.collection.JavaConverters._
+ val resolvedArgs = config.getKeys.asScala.map { k =>
+ k -> config.getString(k)
+ }.toMap
+ Some(resolvedArgs)
+ }
+ }
+
+ private def getHdfsParameters(resolvedAppArguments: Map[String, String]): Option[HdfsParameters] = {
+ val hdfsParameters = for {
+ keytab <- sparkConfig.yarn.additionalConfs.get("spark.yarn.keytab")
+ principal <- sparkConfig.yarn.additionalConfs.get("spark.yarn.principal")
+ checkpointLocation <- resolvedAppArguments.get(HyperdriveCheckpointKey)
+ } yield new HdfsParameters(keytab, principal, checkpointLocation)
+
+ if (hdfsParameters.isEmpty) {
+ logger.warn(
+ s"Could not extract hdfs parameters from spark config ${sparkConfig}" +
+ s" and resolved app arguments ${resolvedAppArguments}"
+ )
+ }
+
+ hdfsParameters
+ }
+
+ private def getKafkaParameters(jobParameters: JobInstanceParameters): Option[(String, Properties)] = {
+ if (!isHyperdriveJob(jobParameters)) {
+ logger.warn(s"Job Definition ${jobParameters} is not a Hyperdrive Job!")
+ None
+ } else {
+ val args = jobParameters.asInstanceOf[SparkInstanceParameters].appArguments
+ val kafkaParameters = for {
+ topic <- args
+ .find(_.startsWith(s"$HyperdriveKafkaTopicKey="))
+ .map(_.replace(s"$HyperdriveKafkaTopicKey=", ""))
+ brokers <- args
+ .find(_.startsWith(s"$HyperdriveKafkaBrokersKey="))
+ .map(_.replace(s"$HyperdriveKafkaBrokersKey=", ""))
+ extraArgs = args
+ .filter(_.startsWith(s"$HyperdriveKafkaExtraOptionsKey."))
+ .map(_.replace(s"$HyperdriveKafkaExtraOptionsKey.", ""))
+ .filter(_.contains("="))
+ .map { s =>
+ val keyValue = s.split("=", 2)
+ val key = keyValue(0).trim
+ val value = keyValue(1).trim
+ (key, value)
+ }
+ .toMap
+ } yield {
+ val properties = new Properties()
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, defaultDeserializer)
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, defaultDeserializer)
+ extraArgs.foreach { case (key, value) => properties.setProperty(key, value) }
+ (topic, properties)
+ }
+
+ if (kafkaParameters.isEmpty) {
+ logger.warn(
+ s"Could not find required kafka parameters in job parameters ${jobParameters} with args ${args}"
+ )
+ }
+ kafkaParameters
+ }
+ }
+
+ private def isHyperdriveJob(jobParameters: JobInstanceParameters) =
+ jobParameters.jobType == JobTypes.Hyperdrive &&
+ jobParameters.isInstanceOf[SparkInstanceParameters]
+
+ private def getCheckpointOffsets(jobParameters: JobInstanceParameters,
+ kafkaParametersOpt: Option[(String, Properties)]
+ )(implicit ec: ExecutionContext): Future[Option[Map[Int, Long]]] = {
+ case class UGIOffset(ugi: UserGroupInformation, latestOffset: (String, Boolean))
+ val hdfsParametersOpt: Option[HdfsParameters] = getResolvedAppArguments(jobParameters).flatMap(getHdfsParameters)
+
+ if (hdfsParametersOpt.isEmpty) {
+ logger.debug(s"Hdfs parameters were not found in job definition ${jobParameters}")
+ }
+
+ Future {
+ val ugiOffset = for {
+ hdfsParameters <- hdfsParametersOpt
+ ugi = userGroupInformationService.loginUserFromKeytab(hdfsParameters.principal, hdfsParameters.keytab)
+ latestOffset <- checkpointService.getLatestOffsetFilePath(hdfsParameters)(ugi).get
+ } yield { UGIOffset(ugi, latestOffset) }
+ if (ugiOffset.isEmpty || !ugiOffset.get.latestOffset._2) {
+ logger.debug(s"Offset does not exist or is not committed ${ugiOffset.map(_.latestOffset)}")
+ None
+ } else {
+ ugiOffset
+ }
+ }.flatMap {
+ case None => Future { None }
+ case Some(_) if kafkaParametersOpt.isEmpty => Future { None }
+ case Some(ugiOffset) =>
+ Future
+ .fromTry(checkpointService.getOffsetsFromFile(ugiOffset.latestOffset._1)(ugiOffset.ugi))
+ .recover { case e: Exception =>
+ logger.warn(s"Couldn't parse file ${ugiOffset.latestOffset._1}", e)
+ None
+ }
+ }.map { hdfsAllOffsetsOpt =>
+ hdfsAllOffsetsOpt.flatMap { hdfsAllOffsets =>
+ val kafkaParameters = kafkaParametersOpt.get
+ hdfsAllOffsets.get(kafkaParameters._1) match {
+ case Some(v) => Some(v)
+ case None =>
+ logger.warn(s"Could not find offsets for topic ${kafkaParameters._1} in hdfs offsets ${hdfsAllOffsets}")
+ None
+ }
+ }
+ }
+ }
+
+ private def parseConfiguration(settings: Array[String]): Configuration = {
+ val configuration = new BasicConfigurationBuilder[BaseConfiguration](classOf[BaseConfiguration])
+ .configure(
+ new Parameters()
+ .basic()
+ .setListDelimiterHandler(new DefaultListDelimiterHandler(ListDelimiter))
+ )
+ .getConfiguration
+
+ settings.foreach(setOrThrow(_, configuration))
+ configuration
+ }
+
+ private def setOrThrow(setting: String, configuration: Configuration): Unit = {
+ if (!setting.contains(PropertyDelimiter)) {
+ throw new IllegalArgumentException(s"Invalid setting format: $setting")
+ } else {
+ val settingKeyValue = setting.split(PropertyDelimiter, 2)
+ configuration.setProperty(settingKeyValue(0).trim, settingKeyValue(1).trim)
+ }
+ }
+
+ private def offsetsEqual(offsets1: Map[Int, Long], offsets2: Map[Int, Long]) = {
+ offsets1.keySet == offsets2.keySet &&
+ offsets1.forall { case (partition, offset1) =>
+ offset1 == offsets2(partition)
+ }
+ }
+
+ private def offsetsConsumed(checkpointOffsets: Map[Int, Long], kafkaOffsets: Map[Int, Long]) = {
+ val isSamePartitions = kafkaOffsets.keySet == checkpointOffsets.keySet
+ isSamePartitions && kafkaOffsets.nonEmpty && kafkaOffsets.forall { case (partition, kafkaPartitionOffset) =>
+ checkpointOffsets(partition) == kafkaPartitionOffset
+ }
+ }
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala
new file mode 100644
index 000000000..2a8f1fde4
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+import org.slf4j.LoggerFactory
+import org.springframework.stereotype.Service
+import org.springframework.util.ConcurrentLruCache
+import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction}
+import za.co.absa.hyperdrive.trigger.configuration.application.GeneralConfig
+
+import java.util.Properties
+import java.util.UUID.randomUUID
+import javax.inject.Inject
+import scala.collection.JavaConverters._
+
+trait KafkaService {
+ def getBeginningOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
+ def getEndOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
+}
+
+@Service
+class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService {
+ private val logger = LoggerFactory.getLogger(this.getClass)
+ private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]](
+ generalConfig.kafkaConsumersCacheSize,
+ createKafkaConsumer
+ )
+
+ override def getBeginningOffsets(topic: String, consumerProperties: Properties): Map[Int, Long] = {
+ getOffsets(topic, consumerProperties, BeginningOffsets)
+ }
+
+ override def getEndOffsets(topic: String, consumerProperties: Properties): Map[Int, Long] = {
+ getOffsets(topic, consumerProperties, EndOffsets)
+ }
+
+ def createKafkaConsumer(propertiesThreadId: (Properties, Long)): KafkaConsumer[String, String] = {
+ logger.info(
+ s"Creating new Kafka Consumer for thread id ${propertiesThreadId._2} and" +
+ s" properties ${propertiesThreadId._1}. Current cache size is ${kafkaConsumersCache.size()}"
+ )
+ new KafkaConsumer[String, String](propertiesThreadId._1)
+ }
+
+ private def getOffsets(topic: String, properties: Properties, offsetFn: OffsetFunction): Map[Int, Long] = {
+ val groupId = s"hyperdrive-trigger-kafkaService-${randomUUID().toString}"
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
+ val consumer = kafkaConsumersCache.get((properties, Thread.currentThread().getId))
+ val partitionInfo = Option(consumer.partitionsFor(topic)).map(_.asScala).getOrElse(Seq())
+ val topicPartitions = partitionInfo.map(p => new TopicPartition(p.topic(), p.partition()))
+ val offsets = offsetFn match {
+ case KafkaServiceImpl.BeginningOffsets => consumer.beginningOffsets(topicPartitions.asJava)
+ case KafkaServiceImpl.EndOffsets => consumer.endOffsets(topicPartitions.asJava)
+ }
+ Option(offsets)
+ .map(_.asScala)
+ .getOrElse(Map())
+ .map { case (topicPartition: TopicPartition, offset: java.lang.Long) =>
+ topicPartition.partition() -> offset.longValue()
+ }
+ .toMap
+ }
+
+}
+
+object KafkaServiceImpl {
+ sealed abstract class OffsetFunction
+ case object BeginningOffsets extends OffsetFunction
+ case object EndOffsets extends OffsetFunction
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/UserGroupInformationService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/UserGroupInformationService.scala
new file mode 100644
index 000000000..7cef73b12
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/UserGroupInformationService.scala
@@ -0,0 +1,30 @@
+
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.springframework.stereotype.Service
+
+trait UserGroupInformationService {
+ def loginUserFromKeytab(principal: String, keytab: String): UserGroupInformation
+}
+
+@Service
+class UserGroupInformationServiceImpl extends UserGroupInformationService {
+ override def loginUserFromKeytab(principal: String, keytab: String): UserGroupInformation =
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtil.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtil.scala
new file mode 100644
index 000000000..de4a9ab40
--- /dev/null
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtil.scala
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.utils
+
+import scala.util.{Failure, Success, Try}
+
+object ScalaUtil {
+ def swap[T](optTry: Option[Try[T]]): Try[Option[T]] = {
+ optTry match {
+ case Some(Success(t)) => Success(Some(t))
+ case Some(Failure(e)) => Failure(e)
+ case None => Success(None)
+ }
+ }
+}
diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/GeneralConfig.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/GeneralConfig.scala
index 75400bf51..3253f65a3 100644
--- a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/GeneralConfig.scala
+++ b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/GeneralConfig.scala
@@ -34,5 +34,7 @@ class GeneralConfig(
@DefaultValue(Array("Unknown"))
val version: String,
@(NotBlank @field)
- val appUniqueId: String
+ val appUniqueId: String,
+ @DefaultValue(Array("50"))
+ val kafkaConsumersCacheSize: Int
)
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala
new file mode 100644
index 000000000..8f4a49ba3
--- /dev/null
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.hadoop.fs
+import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.security.UserGroupInformation
+import org.mockito.ArgumentCaptor
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{reset, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+
+import scala.util.{Failure, Try}
+
+class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter with MockitoSugar {
+ private val hdfsService = mock[HdfsService]
+ private val ugi = mock[UserGroupInformation]
+ private val underTest = new CheckpointServiceImpl(hdfsService)
+
+ before {
+ reset(hdfsService)
+ }
+
+ "getOffsetFromFile" should "return None if parsing failed" in {
+ when(hdfsService.parseFileAndClose(any(), any())(any())).thenReturn(Try(None))
+
+ val result = underTest.getOffsetsFromFile("non-existent")(ugi)
+
+ result shouldBe Try(None)
+ }
+
+ it should "return the parsed contents" in {
+ val offsets = Map(
+ "topic" -> Map(0 -> 1000L)
+ )
+ when(hdfsService.parseFileAndClose[underTest.TopicPartitionOffsets](any(), any())(any()))
+ .thenReturn(Try(Some(offsets)))
+
+ val result = underTest.getOffsetsFromFile("any")(ugi)
+
+ result shouldBe Try(Some(offsets))
+ }
+
+ it should "parse an offset file" in {
+ underTest.getOffsetsFromFile("any")(ugi)
+ val fnCaptor: ArgumentCaptor[Iterator[String] => underTest.TopicPartitionOffsets] =
+ ArgumentCaptor.forClass(classOf[Iterator[String] => underTest.TopicPartitionOffsets])
+ verify(hdfsService).parseFileAndClose(any(), fnCaptor.capture())(any())
+ val parseFn = fnCaptor.getValue
+ val lines = Seq(
+ "v1",
+ raw"""{"batchWatermarkMs":0,"batchTimestampMs":1633360640176}""",
+ raw"""{"my.topic":{"2":2021,"1":1021,"3":3021,"0":21}, "my.other.topic":{"0":0}}"""
+ ).toIterator
+
+ val result = parseFn.apply(lines)
+
+ result.size shouldBe 2
+ result.head._1 shouldBe "my.topic"
+ result.head._2 should contain theSameElementsAs Map("2" -> 2021, "1" -> 1021, "3" -> 3021, "0" -> 21)
+ result.toSeq(1)._1 shouldBe "my.other.topic"
+ result.toSeq(1)._2 should contain theSameElementsAs Map("0" -> 0)
+ }
+
+ "getLatestOffsetFile" should "get the latest offset file, and it is committed" in {
+ when(hdfsService.exists(any())(any())).thenReturn(Try(true))
+ when(hdfsService.listStatus(any(), any())(any())).thenReturn(Try(createOffsetFiles(12)))
+ val params = getHdfsParameters
+
+ val resultTryOpt = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ resultTryOpt.isSuccess shouldBe true
+ resultTryOpt.get.isDefined shouldBe true
+ val result = resultTryOpt.get.get
+ result._1 shouldBe s"/checkpoints/offsets/12"
+ result._2 shouldBe true
+ }
+
+ it should "get the latest offset file, and committed = false, if the commits folder is empty" in {
+ when(hdfsService.exists(any())(any())).thenReturn(Try(true))
+ when(hdfsService.listStatus(any(), any())(any())).thenAnswer((invocation: InvocationOnMock) => {
+ val path = invocation.getArgument[fs.Path](0)
+ if (path.toString.contains("offsets")) {
+ Try(createOffsetFiles(12))
+ } else {
+ Try(Array[FileStatus]())
+ }
+ })
+ val params = getHdfsParameters
+
+ val resultTryOpt = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ resultTryOpt.isSuccess shouldBe true
+ resultTryOpt.get.isDefined shouldBe true
+ val result = resultTryOpt.get.get
+ result._1 shouldBe "/checkpoints/offsets/12"
+ result._2 shouldBe false
+ }
+
+ it should "get the latest offset file, and it is not committed" in {
+ when(hdfsService.exists(any())(any())).thenReturn(Try(true))
+ when(hdfsService.listStatus(any(), any())(any())).thenAnswer((invocation: InvocationOnMock) => {
+ val path = invocation.getArgument[fs.Path](0)
+ if (path.toString.contains("offsets")) {
+ Try(createOffsetFiles(12))
+ } else {
+ Try(createOffsetFiles(11))
+ }
+ })
+ val params = getHdfsParameters
+
+ val resultTryOpt = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ resultTryOpt.isSuccess shouldBe true
+ resultTryOpt.get.isDefined shouldBe true
+ val result = resultTryOpt.get.get
+ result._1 shouldBe "/checkpoints/offsets/12"
+ result._2 shouldBe false
+ }
+
+ it should "return None if the checkpoints folder does not exist" in {
+ when(hdfsService.exists(any())(any())).thenReturn(Try(false))
+ val params = getHdfsParameters
+
+ val result = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ result.isSuccess shouldBe true
+ result.get.isDefined shouldBe false
+ }
+
+ it should "return None if the offsets folder is empty" in {
+ when(hdfsService.exists(any())(any())).thenReturn(Try(true))
+ when(hdfsService.listStatus(any(), any())(any())).thenReturn(Try(Array[FileStatus]()))
+ val params = getHdfsParameters
+
+ val result = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ result.isSuccess shouldBe true
+ result.get.isDefined shouldBe false
+ }
+
+ it should "return Failure if an exception occurred while accessing the file system" in {
+ val params = getHdfsParameters
+ when(hdfsService.exists(any())(any())).thenReturn(Failure(new RuntimeException("Failed")))
+
+ val result = underTest.getLatestOffsetFilePath(params)(ugi)
+
+ result.isFailure shouldBe true
+ }
+
+ private def createOffsetFiles(maxBatchId: Int) = {
+ (0 to maxBatchId).map { i =>
+ val fst = new FileStatus()
+ fst.setPath(new fs.Path(s"abc/def/$i"))
+ fst
+ }
+ }.toArray
+
+ private def getHdfsParameters = {
+ new HdfsParameters(
+ keytab = "",
+ principal = "",
+ checkpointLocation = "/checkpoints"
+ )
+ }
+}
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsServiceTest.scala
new file mode 100644
index 000000000..e2e9a4a86
--- /dev/null
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HdfsServiceTest.scala
@@ -0,0 +1,71 @@
+
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
+import za.co.absa.commons.io.TempDirectory
+
+import java.nio.file.{Files, Path}
+
+class HdfsServiceTest extends FlatSpec with BeforeAndAfter with Matchers with MockitoSugar {
+
+ private val underTest = new HdfsServiceImpl
+ private val ugi = UserGroupInformation.getCurrentUser
+
+ private var baseDir: TempDirectory = _
+ private var baseDirPath: Path = _
+
+ before {
+ baseDir = TempDirectory("HdfsServiceTest").deleteOnExit()
+ baseDirPath = baseDir.path.toAbsolutePath
+ }
+
+ after {
+ baseDir.delete()
+ }
+
+ "parseFileAndClose" should "parse an offset file" in {
+ val tmpFile = Files.createTempFile(baseDirPath, "parseFileAndClose", ".txt")
+ val parseFn: Iterator[String] => Int = (_: Iterator[String]) => 42
+
+ val resultTryOpt = underTest.parseFileAndClose(tmpFile.toAbsolutePath.toString, parseFn)(ugi)
+
+ resultTryOpt.isSuccess shouldBe true
+ resultTryOpt.get.isDefined shouldBe true
+ val result = resultTryOpt.get.get
+ result shouldBe 42
+ }
+
+ it should "return Failure if an exception occurred while accessing the file system" in {
+ val parseFn: Iterator[String] => Int = (_: Iterator[String]) => 42
+
+ val result = underTest.parseFileAndClose("", parseFn)(ugi)
+
+ result.isFailure shouldBe true
+ }
+
+ it should "return Failure if parsing throws an error" in {
+ val tmpFile = Files.createTempFile(baseDirPath, "hdfs-service-test", ".txt")
+ val parseFn: Iterator[String] => Int = (_: Iterator[String]) => throw new RuntimeException("Failed parsing")
+ val result = underTest.parseFileAndClose(tmpFile.toAbsolutePath.toString, parseFn)(ugi)
+
+ result.isFailure shouldBe true
+ result.failed.get.getMessage should include("hdfs-service-test")
+ }
+}
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala
new file mode 100644
index 000000000..5db7445e4
--- /dev/null
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala
@@ -0,0 +1,298 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.mockito.ArgumentCaptor
+import org.mockito.ArgumentMatchers.{any, eq => eqTo}
+import org.mockito.Mockito.{never, reset, verify, when}
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers}
+import za.co.absa.hyperdrive.trigger.configuration.application.DefaultTestSparkConfig
+import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
+import za.co.absa.hyperdrive.trigger.models.{ShellInstanceParameters, SparkInstanceParameters}
+
+import scala.util.Try
+
+class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers with BeforeAndAfter with MockitoSugar {
+ private val checkpointService = mock[CheckpointService]
+ private val kafkaService = mock[KafkaService]
+ private val ugiService = mock[UserGroupInformationService]
+ private val ugi = mock[UserGroupInformation]
+ private val underTest =
+ new HyperdriveOffsetComparisonServiceImpl(DefaultTestSparkConfig().yarn,
+ checkpointService,
+ ugiService,
+ kafkaService
+ )
+
+ before {
+ reset(checkpointService)
+ reset(kafkaService)
+ }
+
+ "isNewJobInstanceRequired" should "return false if the kafka and checkpoint folder offsets are the same" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi)
+ when(checkpointService.getLatestOffsetFilePath(any())(any()))
+ .thenReturn(Try(Some(("/checkpoint/path/some-topic/offsets/21", true))))
+ when(checkpointService.getOffsetsFromFile(any())(any()))
+ .thenReturn(Try(Some(Map("some-topic" -> Map(2 -> 2021L, 0 -> 21L, 1 -> 1021L)))))
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L, 1 -> 1L, 2 -> 2L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 21L, 1 -> 1021L, 2 -> 2021L))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ val hdfsParametersCaptor: ArgumentCaptor[HdfsParameters] = ArgumentCaptor.forClass(classOf[HdfsParameters])
+ verify(checkpointService).getLatestOffsetFilePath(hdfsParametersCaptor.capture())(any())
+ hdfsParametersCaptor.getValue.keytab shouldBe "/path/to/keytab"
+ hdfsParametersCaptor.getValue.principal shouldBe "principal"
+ hdfsParametersCaptor.getValue.checkpointLocation shouldBe "/checkpoint/path/some-topic"
+ verify(checkpointService).getOffsetsFromFile(eqTo("/checkpoint/path/some-topic/offsets/21"))(any())
+ verify(kafkaService).getEndOffsets(eqTo("some-topic"), any())
+ result shouldBe false
+ }
+ }
+
+ it should "return true if the job type is not hyperdrive" in {
+ val jobParameters = getJobParameters.copy(jobType = JobTypes.Spark)
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService, never()).getOffsetsFromFile(any())(any())
+ verify(kafkaService, never()).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the topic is not in the app arguments" in {
+ val jobParameters = getJobParameters.copy(appArguments = List("reader.kafka.brokers=http://localhost:9092"))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService, never()).getOffsetsFromFile(any())(any())
+ verify(kafkaService, never()).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the kafka brokers are not in the app arguments" in {
+ val jobParameters = getJobParameters.copy(appArguments = List("reader.kafka.topic=my-topic"))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService, never()).getOffsetsFromFile(any())(any())
+ verify(kafkaService, never()).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the jobParameters are not SparkInstanceParameters" in {
+ val jobParameters = ShellInstanceParameters(
+ jobType = JobTypes.Hyperdrive,
+ scriptLocation = "script.sh"
+ )
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService, never()).getOffsetsFromFile(any())(any())
+ verify(kafkaService, never()).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ it should "return false if the kafka topic does not exist" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map[Int, Long]())
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map[Int, Long]())
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ result shouldBe false
+ }
+ }
+
+ it should "return false if the kafka topic is empty" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 21L, 1 -> 42L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 21L, 1 -> 42L))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService, never()).getLatestOffsetFilePath(any())(any())
+ result shouldBe false
+ }
+ }
+
+ it should "return true if no offset file is present" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 100L))
+ when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi)
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(None))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the offset is not committed" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 100L))
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", false))))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if a offset file could not be parsed" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 100L))
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true))))
+ when(checkpointService.getOffsetsFromFile(any())(any()))
+ .thenThrow(new RuntimeException("Failed to parse"))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService).getOffsetsFromFile(any())(any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the checkpoints offset does not contain the topic" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 100L))
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true))))
+ when(checkpointService.getOffsetsFromFile(any())(any()))
+ .thenReturn(Try(Some(Map("some-other-topic" -> Map(0 -> 21L)))))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService).getOffsetsFromFile(any())(any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the kafka offsets and checkpoint offset do not have the same set of partitions" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true))))
+ when(checkpointService.getOffsetsFromFile(any())(any()))
+ .thenReturn(Try(Some(Map("some-topic" -> Map(0 -> 21L)))))
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L, 1 -> 1L, 2 -> 2L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 21L, 1 -> 1021L, 2 -> 2021L))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService).getOffsetsFromFile(any())(any())
+ verify(kafkaService).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ it should "return true if the kafka offsets and checkpoint offsets are not the same" in {
+ val config = getSparkConfig
+ val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService)
+ val jobParameters = getJobParameters
+
+ when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true))))
+ when(checkpointService.getOffsetsFromFile(any())(any()))
+ .thenReturn(Try(Some(Map("some-topic" -> Map(0 -> 42L, 1 -> 55L)))))
+ when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L, 1 -> 0L))
+ when(kafkaService.getEndOffsets(any(), any())).thenReturn(Map(0 -> 42L, 1 -> 7L))
+
+ val resultFut = underTest.isNewJobInstanceRequired(jobParameters)
+
+ resultFut.map { result =>
+ verify(checkpointService).getLatestOffsetFilePath(any())(any())
+ verify(checkpointService).getOffsetsFromFile(any())(any())
+ verify(kafkaService).getEndOffsets(any(), any())
+ result shouldBe true
+ }
+ }
+
+ private def getSparkConfig =
+ DefaultTestSparkConfig().copy(additionalConfs =
+ Map(
+ "spark.yarn.keytab" -> "/path/to/keytab",
+ "spark.yarn.principal" -> "principal"
+ )
+ )
+
+ private def getJobParameters = {
+ SparkInstanceParameters(
+ jobType = JobTypes.Hyperdrive,
+ jobJar = "job.jar",
+ mainClass = "mainClass",
+ appArguments = List(
+ "reader.kafka.topic=some-topic",
+ "reader.kafka.brokers=http://localhost:9092",
+ "writer.common.checkpoint.location=/checkpoint/path/${reader.kafka.topic}"
+ )
+ )
+ }
+}
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala
new file mode 100644
index 000000000..bbe943ca2
--- /dev/null
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.services
+
+import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.common.{PartitionInfo, TopicPartition}
+import org.mockito.ArgumentMatchers.{any, eq => eqTo}
+import org.mockito.Mockito.when
+import org.scalatest.mockito.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, TestGeneralConfig}
+
+import java.util.Properties
+
+class KafkaServiceTest extends FlatSpec with MockitoSugar with Matchers {
+
+ private val mockKafkaConsumer = mock[KafkaConsumer[String, String]]
+ class KafkaServiceTestImpl(generalConfig: GeneralConfig) extends KafkaServiceImpl(generalConfig) {
+ override def createKafkaConsumer(propertiesThreadId: (Properties, Long)): KafkaConsumer[String, String] =
+ mockKafkaConsumer
+ }
+ private val underTest = new KafkaServiceTestImpl(TestGeneralConfig())
+
+ "getEndOffsets" should "return a map of end offsets" in {
+ import scala.collection.JavaConverters._
+ val partitions = Seq(
+ new PartitionInfo("topic", 0, null, null, null),
+ new PartitionInfo("topic", 1, null, null, null)
+ )
+ val endOffsets = Map(
+ new TopicPartition("topic", 0) -> long2Long(200L),
+ new TopicPartition("topic", 1) -> long2Long(400L)
+ ).asJava
+ val topicPartitions = partitions.map(p => new TopicPartition(p.topic(), p.partition())).asJava
+
+ when(mockKafkaConsumer.partitionsFor(any())).thenReturn(partitions.asJava)
+ when(mockKafkaConsumer.endOffsets(eqTo(topicPartitions))).thenReturn(endOffsets)
+
+ val result = underTest.getEndOffsets("topic", new Properties())
+
+ result shouldBe Map(0 -> 200L, 1 -> 400L)
+ }
+
+ it should "return an empty map if partitionsFor returns null" in {
+ when(mockKafkaConsumer.partitionsFor(any())).thenReturn(null)
+
+ val result = underTest.getEndOffsets("non-existent-topic", new Properties())
+
+ result shouldBe Map()
+ }
+
+ it should "return an empty map if getOffsets returns null" in {
+ val partitionInfo = new PartitionInfo("topic", 0, null, null, null)
+ import scala.collection.JavaConverters._
+ val partitions = Seq(partitionInfo).asJava
+ when(mockKafkaConsumer.partitionsFor(any())).thenReturn(partitions)
+ when(mockKafkaConsumer.endOffsets(any())).thenReturn(null)
+
+ val result = underTest.getEndOffsets("non-existent-topic", new Properties())
+
+ result shouldBe Map()
+ }
+}
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtilTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtilTest.scala
new file mode 100644
index 000000000..60b5eb320
--- /dev/null
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/ScalaUtilTest.scala
@@ -0,0 +1,36 @@
+
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.hyperdrive.trigger.api.rest.utils
+
+import org.scalatest.{FlatSpec, Matchers}
+import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap
+
+import scala.util.{Failure, Success}
+
+class ScalaUtilTest extends FlatSpec with Matchers {
+ "swap" should "swap Option[Try] to Try[Option]" in {
+ val ex = new RuntimeException("Failed")
+ val someSuccess = Some(Success(42))
+ val someFailure = Some(Failure(ex))
+ val none = None
+
+ swap(someSuccess) shouldBe Success(Some(42))
+ swap(someFailure) shouldBe Failure(ex)
+ swap(none) shouldBe Success(None)
+ }
+
+}
diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestGeneralConfig.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestGeneralConfig.scala
index abfa0b948..e7e3808ef 100644
--- a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestGeneralConfig.scala
+++ b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestGeneralConfig.scala
@@ -20,7 +20,8 @@ object TestGeneralConfig {
maximumNumberOfWorkflowsInBulkRun: Int = 10,
environment: String = "Unknown",
version: String = "Unknown",
- appUniqueId: String = "20e3f97d-88ac-453c-9524-0166e2c221c5"
+ appUniqueId: String = "20e3f97d-88ac-453c-9524-0166e2c221c5",
+ kafkaConsumersCacheSize: Int = 50
): GeneralConfig =
- new GeneralConfig(maximumNumberOfWorkflowsInBulkRun, environment, version, appUniqueId)
+ new GeneralConfig(maximumNumberOfWorkflowsInBulkRun, environment, version, appUniqueId, kafkaConsumersCacheSize)
}