diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/HyperdriveController.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/HyperdriveController.scala new file mode 100644 index 000000000..055e0a074 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/controllers/HyperdriveController.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.api.rest.controllers + +import org.springframework.web.bind.annotation._ +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveService +import za.co.absa.hyperdrive.trigger.models._ + +import java.util.concurrent.CompletableFuture +import javax.inject.Inject +import scala.compat.java8.FutureConverters._ +import scala.concurrent.ExecutionContext.Implicits.global + +@RestController +class HyperdriveController @Inject() (hyperdriveService: HyperdriveService) { + @GetMapping(path = Array("/hyperdrive/workflows/{id}/ingestionStatus")) + def getIngestionStatus(@PathVariable id: Long): CompletableFuture[Seq[IngestionStatus]] = + hyperdriveService.getIngestionStatus(id).toJava.toCompletableFuture +} diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala index 85b62f219..1baabdb4d 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala @@ -25,7 +25,7 @@ import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap import javax.inject.Inject -import scala.util.Try +import scala.util.{Success, Try} trait CheckpointService { type TopicPartitionOffsets = Map[String, Map[Int, Long]] @@ -34,6 +34,10 @@ trait CheckpointService { def getLatestOffsetFilePath(params: HdfsParameters)( implicit ugi: UserGroupInformation ): Try[Option[(String, Boolean)]] + + def getLatestCommittedOffset(params: HdfsParameters)( + implicit ugi: UserGroupInformation + ): Try[Option[TopicPartitionOffsets]] } class HdfsParameters( @@ -98,6 +102,16 @@ class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends C } } + override def getLatestCommittedOffset( + params: HdfsParameters + )(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]] = { + getLatestCommitBatchId(params.checkpointLocation).flatMap { + _.map { latestCommit => + getOffsetsFromFile(new Path(s"${params.checkpointLocation}/$offsetsDirName/$latestCommit").toString) + }.getOrElse(Success(None)) + } + } + /** * see org.apache.spark.sql.execution.streaming.OffsetSeqLog * and org.apache.spark.sql.kafka010.JsonUtils diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala similarity index 77% rename from src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala rename to src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala index 24e3a504c..b8f1ee9d6 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetService.scala @@ -26,23 +26,28 @@ import org.springframework.context.annotation.Lazy import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobTypes -import za.co.absa.hyperdrive.trigger.models.{JobInstanceParameters, SparkInstanceParameters} +import za.co.absa.hyperdrive.trigger.models.{BeginningEndOffsets, JobInstanceParameters, SparkInstanceParameters} import java.util.Properties import javax.inject.Inject import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} -trait HyperdriveOffsetComparisonService { +trait HyperdriveOffsetService { def isNewJobInstanceRequired(jobParameters: JobInstanceParameters)(implicit ec: ExecutionContext): Future[Boolean] + + def getNumberOfMessagesLeft(jobParameters: JobInstanceParameters)( + implicit ec: ExecutionContext + ): Future[Option[(String, Map[Int, Long])]] } @Service @Lazy -class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig, - @Lazy checkpointService: CheckpointService, - @Lazy userGroupInformationService: UserGroupInformationService, - kafkaService: KafkaService -) extends HyperdriveOffsetComparisonService { +class HyperdriveOffsetServiceImpl @Inject() (sparkConfig: SparkConfig, + @Lazy checkpointService: CheckpointService, + @Lazy userGroupInformationService: UserGroupInformationService, + kafkaService: KafkaService +) extends HyperdriveOffsetService { private val logger = LoggerFactory.getLogger(this.getClass) private val HyperdriveCheckpointKey = "writer.common.checkpoint.location" private val HyperdriveKafkaTopicKey = "reader.kafka.topic" @@ -52,6 +57,58 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig, private val ListDelimiter = ',' private val defaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer" + /** + * @param jobParameters Parameters for the job instance. Should contain at least + * - reader.kafka.topic + * - reader.kafka.brokers + * - writer.common.checkpoint.location + * @param ec ExecutionContext + * @return - number of not ingested messages for each topic and partition. + */ + def getNumberOfMessagesLeft( + jobParameters: JobInstanceParameters + )(implicit ec: ExecutionContext): Future[Option[(String, Map[Int, Long])]] = { + val kafkaParametersOpt = getKafkaParameters(jobParameters) + val hdfsParametersOpt: Option[HdfsParameters] = getResolvedAppArguments(jobParameters).flatMap(getHdfsParameters) + + if (kafkaParametersOpt.isEmpty) { + logger.debug(s"Kafka parameters were not found in job definition $jobParameters") + } + + Future( + for { + kafkaParameters <- kafkaParametersOpt + hdfsParameters <- hdfsParametersOpt + } yield { + val kafkaOffsets = kafkaService.getBeginningEndOffsets(kafkaParameters._1, kafkaParameters._2) + kafkaOffsets match { + case BeginningEndOffsets(_, start, end) if start.nonEmpty && end.nonEmpty && start.keySet == end.keySet => + val ugi = userGroupInformationService.loginUserFromKeytab(hdfsParameters.principal, hdfsParameters.keytab) + val hdfsOffsetsTry = checkpointService.getLatestCommittedOffset(hdfsParameters)(ugi).map(_.map(_.head._2)) + + hdfsOffsetsTry match { + case Failure(_) => None + case Success(hdfsOffsetsOption) => + val messagesLeft = kafkaOffsets.beginningOffsets.map { case (partition, kafkaBeginningOffset) => + val kafkaEndOffset = kafkaOffsets.endOffsets(partition) + val numberOfMessages = hdfsOffsetsOption.flatMap(_.get(partition)) match { + case Some(hdfsOffset) if hdfsOffset > kafkaEndOffset => kafkaEndOffset - hdfsOffset + case Some(hdfsOffset) if hdfsOffset > kafkaBeginningOffset => kafkaEndOffset - hdfsOffset + case Some(hdfsOffset) if hdfsOffset <= kafkaBeginningOffset => kafkaEndOffset - kafkaBeginningOffset + case None => kafkaEndOffset - kafkaBeginningOffset + } + partition -> numberOfMessages + } + Some((kafkaOffsets.topic, messagesLeft)) + } + case _ => + logger.warn(s"Inconsistent response from kafka for topic: ${kafkaOffsets.topic}") + None + } + } + ).map(_.flatten) + } + /** * @param jobParameters Parameters for the job instance. Should contain at least * - reader.kafka.topic diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala new file mode 100644 index 000000000..bbb617ba1 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveService.scala @@ -0,0 +1,83 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.api.rest.services + +import org.slf4j.LoggerFactory +import org.springframework.stereotype.Service +import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus} +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes +import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +trait HyperdriveService { + protected val workflowRepository: WorkflowRepository + protected val jobTemplateService: JobTemplateService + protected val hyperdriveOffsetService: HyperdriveOffsetService + + def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] +} + +@Service +class HyperdriveServiceImpl( + override protected val workflowRepository: WorkflowRepository, + override protected val jobTemplateService: JobTemplateService, + override protected val hyperdriveOffsetService: HyperdriveOffsetService +) extends HyperdriveService { + private val logger = LoggerFactory.getLogger(this.getClass) + + override def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] = { + workflowRepository.getWorkflow(id).flatMap { workflow => + jobTemplateService + .resolveJobTemplate(workflow.dagDefinitionJoined) + .flatMap(resolvedJobs => + Future.sequence( + resolvedJobs.map { + case resolvedJob if resolvedJob.jobParameters.jobType == JobTypes.Hyperdrive => + hyperdriveOffsetService.getNumberOfMessagesLeft(resolvedJob.jobParameters).transformWith { + case Failure(exception) => + logger.error(s"Failed to get number of messages left to ingest for a workflow: $id", exception) + Future.successful( + IngestionStatus( + jobName = resolvedJob.name, + jobType = resolvedJob.jobParameters.jobType.name, + topicStatus = None + ) + ) + case Success(messagesLeftOpt) => + Future.successful( + IngestionStatus( + jobName = resolvedJob.name, + jobType = resolvedJob.jobParameters.jobType.name, + topicStatus = messagesLeftOpt.map(messagesLeft => TopicStatus(messagesLeft._1, messagesLeft._2)) + ) + ) + } + case resolvedJob => + Future.successful( + IngestionStatus( + jobName = resolvedJob.name, + jobType = resolvedJob.jobParameters.jobType.name, + topicStatus = None + ) + ) + } + ) + ) + } + } +} diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala index b2f2ee3ef..18428a872 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service import org.springframework.util.ConcurrentLruCache import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction} import za.co.absa.hyperdrive.trigger.configuration.application.GeneralConfig +import za.co.absa.hyperdrive.trigger.models.BeginningEndOffsets import java.util.Properties import java.util.UUID.randomUUID @@ -31,6 +32,7 @@ import scala.collection.JavaConverters._ trait KafkaService { def getBeginningOffsets(topic: String, consumerProperties: Properties): Map[Int, Long] def getEndOffsets(topic: String, consumerProperties: Properties): Map[Int, Long] + def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets } @Service @@ -50,6 +52,14 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer getOffsets(topic, consumerProperties, EndOffsets) } + def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets = { + BeginningEndOffsets( + topic, + getOffsets(topic, consumerProperties, BeginningOffsets), + getOffsets(topic, consumerProperties, EndOffsets) + ) + } + def createKafkaConsumer(propertiesThreadId: (Properties, Long)): KafkaConsumer[String, String] = { logger.info( s"Creating new Kafka Consumer for thread id ${propertiesThreadId._2} and" + diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/BeginningEndOffsets.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/BeginningEndOffsets.scala new file mode 100644 index 000000000..26e63dd31 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/BeginningEndOffsets.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.models + +case class BeginningEndOffsets( + topic: String, + beginningOffsets: Map[Int, Long], + endOffsets: Map[Int, Long] +) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/IngestionStatus.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/IngestionStatus.scala new file mode 100644 index 000000000..35a835da8 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/IngestionStatus.scala @@ -0,0 +1,24 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.models + +case class IngestionStatus( + jobName: String, + jobType: String, + topicStatus: Option[TopicStatus] +) + +case class TopicStatus(topic: String, messagesToIngest: Map[Int, Long]) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 4f3db6af0..346f15779 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -34,7 +34,7 @@ import org.springframework.beans.factory.BeanFactory import org.springframework.context.annotation.Lazy import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor import org.springframework.stereotype.Component -import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig} import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender @@ -49,7 +49,7 @@ class Executors @Inject() ( beanFactory: BeanFactory, implicit val sparkConfig: SparkConfig, schedulerConfig: SchedulerConfig, - @Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService + @Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetService ) { private val logger = LoggerFactory.getLogger(this.getClass) private implicit val executionContext: ExecutionContextExecutor = diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index 0b936e9f9..e25c458fd 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -16,7 +16,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark import org.slf4j.LoggerFactory -import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} @@ -31,23 +31,23 @@ object HyperdriveExecutor { jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit], sparkClusterService: SparkClusterService, - offsetComparisonService: HyperdriveOffsetComparisonService + offsetService: HyperdriveOffsetService )(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] = jobInstance.executorJobId match { - case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob) + case None => submitJob(sparkClusterService, offsetService, jobInstance, jobParameters, updateJob) case Some(executorJobId) => SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) } private def submitJob(sparkClusterService: SparkClusterService, - offsetComparisonService: HyperdriveOffsetComparisonService, + offsetService: HyperdriveOffsetService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit] )(implicit executionContext: ExecutionContext) = { logger.debug("Using HyperdriveExecutor") for { - newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters) + newJobRequired <- offsetService.isNewJobInstanceRequired(jobParameters) _ <- if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala index e81567158..09abb94e0 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala @@ -25,7 +25,7 @@ import org.mockito.invocation.InvocationOnMock import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} -import scala.util.{Failure, Try} +import scala.util.{Failure, Success, Try} class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter with MockitoSugar { private val hdfsService = mock[HdfsService] @@ -163,6 +163,88 @@ class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter w result.isFailure shouldBe true } + "getLatestCommittedOffset" should "fail if file check fails" in { + when(hdfsService.exists(any())(any())).thenReturn(Failure(new Exception())) + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.isFailure shouldBe true + } + + it should "fail if list files fails" in { + when(hdfsService.exists(any())(any())).thenReturn(Success(true)) + when(hdfsService.listStatus(any(), any())(any())).thenReturn(Failure(new Exception())) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.isFailure shouldBe true + } + + it should "return none if dir does not exist" in { + when(hdfsService.exists(any())(any())).thenReturn(Success(false)) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.get.isDefined shouldBe false + } + + it should "return none if dir is empty" in { + when(hdfsService.exists(any())(any())).thenReturn(Success(true)) + when(hdfsService.listStatus(any(), any())(any())).thenReturn(Success(Array.empty[FileStatus])) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.get.isDefined shouldBe false + } + + it should "fail if file parse fails" in { + when(hdfsService.exists(any())(any())).thenReturn(Success(true)) + when(hdfsService.listStatus(any(), any())(any())).thenReturn(Success(createOffsetFiles(12))) + when(hdfsService.parseFileAndClose(any(), any())(any())).thenReturn(Failure(new Exception())) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.isFailure shouldBe true + } + + it should "return none if file cannot be found" in { + when(hdfsService.exists(any())(any())).thenReturn(Success(true)) + when(hdfsService.listStatus(any(), any())(any())).thenReturn(Success(createOffsetFiles(12))) + when(hdfsService.parseFileAndClose(any(), any())(any())).thenReturn(Success(None)) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.get.isEmpty shouldBe true + } + + it should "return the parsed contents" in { + val offsets = Map( + "topic" -> Map(0 -> 1000L) + ) + when(hdfsService.exists(any())(any())).thenReturn(Success(true)) + when(hdfsService.listStatus(any(), any())(any())).thenReturn(Success(createOffsetFiles(12))) + when(hdfsService.parseFileAndClose[underTest.TopicPartitionOffsets](any(), any())(any())) + .thenReturn(Try(Some(offsets))) + + val params = getHdfsParameters + + val result = underTest.getLatestCommittedOffset(params)(ugi) + + result.get.isDefined shouldBe true + result.get shouldBe Some(offsets) + } + private def createOffsetFiles(maxBatchId: Int) = { (0 to maxBatchId).map { i => val fst = new FileStatus() diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetServiceTest.scala similarity index 61% rename from src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala rename to src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetServiceTest.scala index a3144e51d..6baf9fd8e 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetServiceTest.scala @@ -23,21 +23,17 @@ import org.scalatest.mockito.MockitoSugar import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers} import za.co.absa.hyperdrive.trigger.configuration.application.DefaultTestSparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobTypes -import za.co.absa.hyperdrive.trigger.models.{ShellInstanceParameters, SparkInstanceParameters} +import za.co.absa.hyperdrive.trigger.models.{BeginningEndOffsets, ShellInstanceParameters, SparkInstanceParameters} -import scala.util.Try +import scala.util.{Failure, Success, Try} -class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers with BeforeAndAfter with MockitoSugar { +class HyperdriveOffsetServiceTest extends AsyncFlatSpec with Matchers with BeforeAndAfter with MockitoSugar { private val checkpointService = mock[CheckpointService] private val kafkaService = mock[KafkaService] private val ugiService = mock[UserGroupInformationService] private val ugi = mock[UserGroupInformation] private val underTest = - new HyperdriveOffsetComparisonServiceImpl(DefaultTestSparkConfig().yarn, - checkpointService, - ugiService, - kafkaService - ) + new HyperdriveOffsetServiceImpl(DefaultTestSparkConfig().yarn, checkpointService, ugiService, kafkaService) before { reset(checkpointService) @@ -46,7 +42,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers "isNewJobInstanceRequired" should "return false if the kafka and checkpoint folder offsets are the same" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) @@ -128,7 +124,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if the kafka topic does not exist" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map[Int, Long]()) @@ -144,7 +140,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return false if the kafka topic is empty" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 21L, 1 -> 42L)) @@ -160,7 +156,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if no offset file is present" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L)) @@ -178,7 +174,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if the offset is not committed" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L)) @@ -195,7 +191,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if a offset file could not be parsed" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L)) @@ -215,7 +211,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if the checkpoints offset does not contain the topic" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(kafkaService.getBeginningOffsets(any(), any())).thenReturn(Map(0 -> 0L)) @@ -235,7 +231,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if the kafka offsets and checkpoint offset do not have the same set of partitions" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true)))) @@ -256,7 +252,7 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers it should "return true if the kafka offsets and checkpoint offsets are not the same" in { val config = getSparkConfig - val underTest = new HyperdriveOffsetComparisonServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) val jobParameters = getJobParameters when(checkpointService.getLatestOffsetFilePath(any())(any())).thenReturn(Try(Some(("1", true)))) @@ -275,6 +271,140 @@ class HyperdriveOffsetComparisonServiceTest extends AsyncFlatSpec with Matchers } } + "getNumberOfMessagesLeft" should "return none if get offsets from kafka fails" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets("topic", Map.empty, Map.empty)) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result shouldBe None + } + } + + it should "return none if get offsets from checkpoint fails" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets("topic", Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())).thenReturn(Failure(new Exception())) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result shouldBe None + } + } + + it should "return number of all messages in kafka if there is no offset in checkpoint" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val topic = "topic" + val expectedResult = (topic, Map(0 -> 10, 1 -> 90)) + + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets(topic, Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())).thenReturn(Success(None)) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result.isDefined shouldBe true + result.get shouldBe expectedResult + } + } + + it should "return number of messages left to ingest" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val topic = "topic" + val expectedResult = (topic, Map(0 -> 8, 1 -> 80)) + + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets(topic, Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())) + .thenReturn(Try(Some(Map(topic -> Map(0 -> 2L, 1 -> 20L))))) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result.isDefined shouldBe true + result.get shouldBe expectedResult + } + } + + it should "return number of messages left to ingest and ignore extra partition in checkpoint offset" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val topic = "topic" + val expectedResult = (topic, Map(0 -> 8, 1 -> 80)) + + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets(topic, Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())) + .thenReturn(Try(Some(Map(topic -> Map(0 -> 2L, 1 -> 20L, 3 -> 10L))))) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result.isDefined shouldBe true + result.get shouldBe expectedResult + } + } + + it should "return number of messages left to ingest and handle missing partition in checkpoint offset" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val topic = "topic" + val expectedResult = (topic, Map(0 -> 8, 1 -> 90)) + + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets(topic, Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())).thenReturn(Try(Some(Map(topic -> Map(0 -> 2L))))) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result.isDefined shouldBe true + result.get shouldBe expectedResult + } + } + + it should "return negative numbers if kafka offsets are smaller than checkpoint offsets" in { + val config = getSparkConfig + val jobParameters = getJobParameters + val topic = "topic" + val expectedResult = (topic, Map(0 -> -10, 1 -> -100)) + + val underTest = new HyperdriveOffsetServiceImpl(config.yarn, checkpointService, ugiService, kafkaService) + + when(ugiService.loginUserFromKeytab(any(), any())).thenReturn(ugi) + when(kafkaService.getBeginningEndOffsets(any(), any())) + .thenReturn(BeginningEndOffsets(topic, Map(0 -> 0, 1 -> 10), Map(0 -> 10, 1 -> 100))) + when(checkpointService.getLatestCommittedOffset(any())(any())) + .thenReturn(Try(Some(Map(topic -> Map(0 -> 20L, 1 -> 200L))))) + + val resultFut = underTest.getNumberOfMessagesLeft(jobParameters) + resultFut.map { result => + result.isDefined shouldBe true + result.get shouldBe expectedResult + } + } + private def getSparkConfig = DefaultTestSparkConfig().copy(additionalConfs = Map( diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveServiceTest.scala new file mode 100644 index 000000000..eebe0c22b --- /dev/null +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveServiceTest.scala @@ -0,0 +1,104 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.api.rest.services + +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{reset, when} +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers} +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes +import za.co.absa.hyperdrive.trigger.models.{ResolvedJobDefinition, ShellInstanceParameters, SparkInstanceParameters} +import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus} +import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository + +import scala.concurrent.Future + +class HyperdriveServiceTest extends AsyncFlatSpec with Matchers with BeforeAndAfter with MockitoSugar { + private val workflowRepository = mock[WorkflowRepository] + private val jobTemplateService = mock[JobTemplateService] + private val hyperdriveOffsetService = mock[HyperdriveOffsetService] + private val underTest = new HyperdriveServiceImpl(workflowRepository, jobTemplateService, hyperdriveOffsetService) + + before { + reset(workflowRepository) + reset(jobTemplateService) + reset(hyperdriveOffsetService) + } + + "getIngestionStatus" should "fail on get workflow failure" in { + val id = 1 + val error = "error" + when(workflowRepository.getWorkflow(any())(any())).thenReturn(Future.failed(new Exception(error))) + + recoverToSucceededIf[Exception] { + underTest.getIngestionStatus(id) + } + } + + it should "fail on resolve job template failure" in { + val id = 1 + val error = "error" + + when(workflowRepository.getWorkflow(any())(any())).thenReturn(Future(WorkflowFixture.createWorkflowJoined())) + when(jobTemplateService.resolveJobTemplate(any())(any())).thenReturn(Future.failed(new Exception(error))) + + recoverToSucceededIf[Exception] { + underTest.getIngestionStatus(id) + } + } + + it should "succeed" in { + val id = 1 + val error = "error" + val resolvedJobDefinitions = Seq( + ResolvedJobDefinition( + name = "JobA", + jobParameters = SparkInstanceParameters(jobType = JobTypes.Hyperdrive, jobJar = "", mainClass = ""), + order = 0 + ), + ResolvedJobDefinition( + name = "JobB", + jobParameters = SparkInstanceParameters(jobType = JobTypes.Hyperdrive, jobJar = "", mainClass = ""), + order = 1 + ), + ResolvedJobDefinition( + name = "JobC", + jobParameters = SparkInstanceParameters(jobType = JobTypes.Spark, jobJar = "", mainClass = ""), + order = 2 + ), + ResolvedJobDefinition(name = "JobD", jobParameters = ShellInstanceParameters(scriptLocation = ""), order = 3) + ) + val expectedResult = Seq( + IngestionStatus( + jobName = "JobA", + JobTypes.Hyperdrive.name, + topicStatus = Some(TopicStatus(topic = "topic", messagesToIngest = Map.empty)) + ), + IngestionStatus(jobName = "JobB", JobTypes.Hyperdrive.name, topicStatus = None), + IngestionStatus(jobName = "JobC", JobTypes.Spark.name, topicStatus = None), + IngestionStatus(jobName = "JobD", JobTypes.Shell.name, topicStatus = None) + ) + when(workflowRepository.getWorkflow(any())(any())).thenReturn(Future(WorkflowFixture.createWorkflowJoined())) + when(jobTemplateService.resolveJobTemplate(any())(any())).thenReturn(Future(resolvedJobDefinitions)) + when(hyperdriveOffsetService.getNumberOfMessagesLeft(any())(any())) + .thenReturn(Future(Some(("topic", Map.empty[Int, Long])))) + .thenReturn(Future.failed(new Exception(error))) + + underTest.getIngestionStatus(id).map { result => + result shouldBe expectedResult + } + } +} diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala index bbe943ca2..743c15190 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaServiceTest.scala @@ -22,6 +22,7 @@ import org.mockito.Mockito.when import org.scalatest.mockito.MockitoSugar import org.scalatest.{FlatSpec, Matchers} import za.co.absa.hyperdrive.trigger.configuration.application.{GeneralConfig, TestGeneralConfig} +import za.co.absa.hyperdrive.trigger.models.BeginningEndOffsets import java.util.Properties @@ -73,4 +74,40 @@ class KafkaServiceTest extends FlatSpec with MockitoSugar with Matchers { result shouldBe Map() } + + "getBeginningEndOffsets" should "return a map of start and end offsets" in { + import scala.collection.JavaConverters._ + val topicName = "topic" + val partitions = Seq( + new PartitionInfo(topicName, 0, null, null, null), + new PartitionInfo(topicName, 1, null, null, null) + ) + val endOffsets = Map( + new TopicPartition(topicName, 0) -> long2Long(200L), + new TopicPartition(topicName, 1) -> long2Long(400L) + ).asJava + val startOffsets = Map( + new TopicPartition(topicName, 0) -> long2Long(100L), + new TopicPartition(topicName, 1) -> long2Long(200L) + ).asJava + val topicPartitions = partitions.map(p => new TopicPartition(p.topic(), p.partition())).asJava + + when(mockKafkaConsumer.partitionsFor(any())) + .thenReturn(partitions.asJava) + .thenReturn(partitions.asJava) + when(mockKafkaConsumer.beginningOffsets(eqTo(topicPartitions))).thenReturn(startOffsets) + when(mockKafkaConsumer.endOffsets(eqTo(topicPartitions))).thenReturn(endOffsets) + + val result = underTest.getBeginningEndOffsets(topicName, new Properties()) + + result shouldBe BeginningEndOffsets(topicName, Map(0 -> 100L, 1 -> 200L), Map(0 -> 200L, 1 -> 400L)) + } + + it should "return empty beginning and end offsets if partitionsFor returns null" in { + val topicName = "non-existent-topic" + when(mockKafkaConsumer.partitionsFor(any())).thenReturn(null) + + val result = underTest.getBeginningEndOffsets(topicName, new Properties()) + result shouldBe BeginningEndOffsets(topicName, Map.empty, Map.empty) + } } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala index 46c629af5..d6285c58a 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala @@ -20,7 +20,7 @@ import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{never, reset, verify, when} import org.scalatest.mockito.MockitoSugar import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers} -import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService import za.co.absa.hyperdrive.trigger.configuration.application.{DefaultTestSparkConfig, SparkConfig} import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InQueue import za.co.absa.hyperdrive.trigger.models.enums.{JobStatuses, JobTypes} @@ -30,12 +30,12 @@ import java.time.LocalDateTime import scala.concurrent.Future class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with BeforeAndAfter with Matchers { - private val offsetComparisonServiceMock = mock[HyperdriveOffsetComparisonService] + private val offsetServiceMock = mock[HyperdriveOffsetService] private val sparkClusterServiceMock = mock[SparkClusterService] private val updateJobStub: JobInstance => Future[Unit] = mock[JobInstance => Future[Unit]] before { - reset(offsetComparisonServiceMock) + reset(offsetServiceMock) reset(sparkClusterServiceMock) reset(updateJobStub) } @@ -44,7 +44,7 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before val jobInstance = getJobInstance val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] - when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { true }) + when(offsetServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { true }) when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) @@ -53,7 +53,7 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before jobInstanceParameters, updateJobStub, sparkClusterServiceMock, - offsetComparisonServiceMock + offsetServiceMock ) resultFut.map { _ => @@ -67,7 +67,7 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before val jobInstance = getJobInstance val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] - when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { false }) + when(offsetServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { false }) when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) @@ -76,7 +76,7 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before jobInstanceParameters, updateJobStub, sparkClusterServiceMock, - offsetComparisonServiceMock + offsetServiceMock ) resultFut.map { _ => diff --git a/ui/src/app/components/workflows/workflows-home/workflows-home.component.html b/ui/src/app/components/workflows/workflows-home/workflows-home.component.html index 615d56cef..6f075df17 100644 --- a/ui/src/app/components/workflows/workflows-home/workflows-home.component.html +++ b/ui/src/app/components/workflows/workflows-home/workflows-home.component.html @@ -220,6 +220,49 @@ {{workflow.isActive ? 'Yes' : 'No'}} + + + + Workflow name: {{detail.name}}
+ Project name: {{detail.project}}
+ Is Active: {{detail.isActive ? 'Yes' : 'No'}}
+ +
+ + + + +
+ +
+
+ + Job Name + Type + Progress + + We couldn't load detail! + + + {{job.jobName}} + {{job.jobType}} + + Topic: {{job?.topic?.topic}}
+
Messages to ingest: {{job?.topic?.messagesToIngest}}
+ Offset inconsistency detected. Please contact support team! +
+ + Not available + +
+
+ +
+
+
diff --git a/ui/src/app/components/workflows/workflows-home/workflows-home.component.spec.ts b/ui/src/app/components/workflows/workflows-home/workflows-home.component.spec.ts index bf6efbd3e..6f258336a 100644 --- a/ui/src/app/components/workflows/workflows-home/workflows-home.component.spec.ts +++ b/ui/src/app/components/workflows/workflows-home/workflows-home.component.spec.ts @@ -35,6 +35,7 @@ import { ImportWorkflows, RunWorkflows, SearchWorkflows, + LoadIngestionStatus, } from '../../../stores/workflows/workflows.actions'; describe('WorkflowsHomeComponent', () => { @@ -58,6 +59,8 @@ describe('WorkflowsHomeComponent', () => { workflowAction: { loading: false, }, + ingestionStatusLoading: true, + ingestionStatus: [], }, }; @@ -94,7 +97,8 @@ describe('WorkflowsHomeComponent', () => { expect(underTest.filters).toEqual([]); expect(underTest.pageFrom).toEqual(0); expect(underTest.pageSize).toEqual(100); - expect(underTest.page).toEqual(0 / 100 + 1); + expect(underTest.ingestionStatusLoading).toEqual(true); + expect(underTest.ingestionStatus).toEqual([]); }); }), ); @@ -512,4 +516,36 @@ describe('WorkflowsHomeComponent', () => { expect(underTest.isRunSelectedWorkflowsDisabled(workflows)).toBeTrue(); }); }); + + describe('onDetailRefresh', () => { + it( + 'should dispatch load ingestion status', + waitForAsync(() => { + const id = 42; + const storeSpy = spyOn(store, 'dispatch'); + + underTest.onDetailRefresh({ id: id }); + + fixture.detectChanges(); + fixture.whenStable().then(() => { + expect(storeSpy).toHaveBeenCalledWith(new LoadIngestionStatus(id)); + }); + }), + ); + + it( + 'should not dispatch load ingestion status if id is null', + waitForAsync(() => { + const id = 42; + const storeSpy = spyOn(store, 'dispatch'); + + underTest.onDetailRefresh({ id: null }); + + fixture.detectChanges(); + fixture.whenStable().then(() => { + expect(storeSpy).toHaveBeenCalledTimes(0); + }); + }), + ); + }); }); diff --git a/ui/src/app/components/workflows/workflows-home/workflows-home.component.ts b/ui/src/app/components/workflows/workflows-home/workflows-home.component.ts index 4d7df127d..fb3f6af01 100644 --- a/ui/src/app/components/workflows/workflows-home/workflows-home.component.ts +++ b/ui/src/app/components/workflows/workflows-home/workflows-home.component.ts @@ -22,6 +22,7 @@ import { absoluteRoutes } from '../../../constants/routes.constants'; import { ExportWorkflows, ImportWorkflows, + LoadIngestionStatus, LoadJobsForRun, RunWorkflows, SearchWorkflows, @@ -40,6 +41,7 @@ import { workflowsHomeColumns } from 'src/app/constants/workflow.constants'; import { TableSearchRequestModel } from '../../../models/search/tableSearchRequest.model'; import { ContainsFilterAttributes } from '../../../models/search/containsFilterAttributes.model'; import { BooleanFilterAttributes } from '../../../models/search/booleanFilterAttributes.model'; +import { IngestionStatusModel } from '../../../models/ingestionStatus.model'; @Component({ selector: 'app-workflows-home', @@ -77,6 +79,9 @@ export class WorkflowsHomeComponent implements OnInit, AfterViewInit, OnDestroy workflowFile: File = undefined; multiWorkflowsFile: File = undefined; + ingestionStatusLoading = true; + ingestionStatus: IngestionStatusModel[] = []; + constructor(private store: Store, private confirmationDialogService: ConfirmationDialogService, private router: Router) { this.routerSubscription = router.events.pipe(filter((e) => e instanceof ResolveEnd)).subscribe((e: ResolveEnd) => { this.ignoreRefresh = e.state.root.component !== WorkflowsHomeComponent; @@ -102,6 +107,8 @@ export class WorkflowsHomeComponent implements OnInit, AfterViewInit, OnDestroy } else { this.loadingAction = state.workflowAction.loading; } + this.ingestionStatusLoading = state.ingestionStatusLoading; + this.ingestionStatus = state.ingestionStatus; }); } @@ -212,6 +219,10 @@ export class WorkflowsHomeComponent implements OnInit, AfterViewInit, OnDestroy } } + onDetailRefresh(event) { + if (!!event?.id) this.store.dispatch(new LoadIngestionStatus(event.id)); + } + refresh() { this.selected = []; const searchRequestModel: TableSearchRequestModel = { diff --git a/ui/src/app/constants/api.constants.ts b/ui/src/app/constants/api.constants.ts index 0d5d4800a..725bb8fba 100644 --- a/ui/src/app/constants/api.constants.ts +++ b/ui/src/app/constants/api.constants.ts @@ -65,6 +65,7 @@ export const api = { GET_QUARTZ_DETAIL: '/util/quartzDetail', GET_NOTIFICATION_RULE_FROM_HISTORY: '/notificationRuleFromHistory', + GET_INGESTION_STATUS: '/hyperdrive/workflows/{id}/ingestionStatus', GET_KAFKA_TOPIC_AUTHORIZATIONS: '/kafka/{kafkaTopic}/authorizations', }; diff --git a/ui/src/app/models/ingestionStatus.model.ts b/ui/src/app/models/ingestionStatus.model.ts new file mode 100644 index 000000000..2b4fca88a --- /dev/null +++ b/ui/src/app/models/ingestionStatus.model.ts @@ -0,0 +1,75 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export type IngestionStatusModel = { + jobName: string; + jobType: string; + topic?: TopicModel; +}; + +export class IngestionStatusModelFactory { + static create(jobName: string, jobType: string, topic?: TopicModel): IngestionStatusModel { + return { jobName: jobName, jobType: jobType, topic: topic }; + } + + static fromIngestionStatusResponseModel(ingestionStatusResponse: IngestionStatusResponseModel): IngestionStatusModel { + return this.create( + ingestionStatusResponse.jobName, + ingestionStatusResponse.jobType, + ingestionStatusResponse?.topic + ? TopicModelFactory.create( + ingestionStatusResponse.topic.topic, + Object.keys(ingestionStatusResponse.topic.messagesToIngest) + .map((key) => ingestionStatusResponse.topic.messagesToIngest[key]) + .reduce((acc, cur) => acc + Number(cur), 0), + ) + : null, + ); + } +} + +export type TopicModel = { + topic: string; + messagesToIngest: number; +}; + +export class TopicModelFactory { + static create(topic: string, messagesToIngest: number): TopicModel { + return { topic: topic, messagesToIngest: messagesToIngest }; + } +} + +export type IngestionStatusResponseModel = { + jobName: string; + jobType: string; + topic?: TopicResponseModel; +}; + +export class IngestionStatusResponseModelFactory { + static create(jobName: string, jobType: string, topic?: TopicResponseModel): IngestionStatusResponseModel { + return { jobName: jobName, jobType: jobType, topic: topic }; + } +} + +export type TopicResponseModel = { + topic: string; + messagesToIngest: [number, number][]; +}; + +export class TopicResponseModelFactory { + static create(topic: string, messagesToIngest: [number, number][]): TopicResponseModel { + return { topic: topic, messagesToIngest: messagesToIngest }; + } +} diff --git a/ui/src/app/services/hyperdrive/hyperdrive.service.spec.ts b/ui/src/app/services/hyperdrive/hyperdrive.service.spec.ts new file mode 100644 index 000000000..216db511a --- /dev/null +++ b/ui/src/app/services/hyperdrive/hyperdrive.service.spec.ts @@ -0,0 +1,56 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { TestBed } from '@angular/core/testing'; + +import { HttpClientTestingModule, HttpTestingController } from '@angular/common/http/testing'; +import { api } from '../../constants/api.constants'; +import { HyperdriveService } from './hyperdrive.service'; +import { IngestionStatusModelFactory } from '../../models/ingestionStatus.model'; + +describe('HyperdriveService', () => { + let underTest: HyperdriveService; + let httpTestingController: HttpTestingController; + + beforeEach(() => { + TestBed.configureTestingModule({ + providers: [HyperdriveService], + imports: [HttpClientTestingModule], + }); + underTest = TestBed.inject(HyperdriveService); + httpTestingController = TestBed.inject(HttpTestingController); + }); + + afterEach(() => { + httpTestingController.verify(); + }); + + it('should be created', () => { + expect(underTest).toBeTruthy(); + }); + + it('getProjects() should return projects', () => { + const ingestionStatus = [IngestionStatusModelFactory.create('jobName', 'Hyperdrive', null)]; + const id = 1; + underTest.getIngestionStatus(id).subscribe( + (data) => expect(data).toEqual(ingestionStatus), + (error) => fail(error), + ); + + const req = httpTestingController.expectOne(api.GET_INGESTION_STATUS.replace('{id}', id.toString())); + expect(req.request.method).toEqual('GET'); + req.flush([...ingestionStatus]); + }); +}); diff --git a/ui/src/app/services/hyperdrive/hyperdrive.service.ts b/ui/src/app/services/hyperdrive/hyperdrive.service.ts new file mode 100644 index 000000000..c85d13ea6 --- /dev/null +++ b/ui/src/app/services/hyperdrive/hyperdrive.service.ts @@ -0,0 +1,39 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Injectable } from '@angular/core'; +import { HttpClient } from '@angular/common/http'; +import { map } from 'rxjs/operators'; +import { Observable } from 'rxjs'; +import { IngestionStatusModel, IngestionStatusModelFactory, IngestionStatusResponseModel } from '../../models/ingestionStatus.model'; +import { api } from '../../constants/api.constants'; + +@Injectable({ + providedIn: 'root', +}) +export class HyperdriveService { + constructor(private httpClient: HttpClient) {} + + getIngestionStatus(id: number): Observable { + return this.httpClient + .get(api.GET_INGESTION_STATUS.replace('{id}', id.toString()), { observe: 'response' }) + .pipe(map((response) => response.body)) + .pipe( + map((response) => + response.map((ingestionStatusResponse) => IngestionStatusModelFactory.fromIngestionStatusResponseModel(ingestionStatusResponse)), + ), + ); + } +} diff --git a/ui/src/app/stores/workflows/workflows.actions.ts b/ui/src/app/stores/workflows/workflows.actions.ts index 252e83118..39c612b25 100644 --- a/ui/src/app/stores/workflows/workflows.actions.ts +++ b/ui/src/app/stores/workflows/workflows.actions.ts @@ -21,6 +21,7 @@ import { HistoryModel, WorkflowHistoryModel } from '../../models/historyModel'; import { JobForRunModel } from '../../models/jobForRun.model'; import { JobTemplateModel } from '../../models/jobTemplate.model'; import { TableSearchRequestModel } from '../../models/search/tableSearchRequest.model'; +import { IngestionStatusModel } from '../../models/ingestionStatus.model'; export const INITIALIZE_WORKFLOWS = 'INITIALIZE_WORKFLOWS'; export const INITIALIZE_WORKFLOWS_SUCCESS = 'INITIALIZE_WORKFLOWS_SUCCESS'; @@ -94,6 +95,10 @@ export const REVERT_WORKFLOW = 'REVERT_WORKFLOW'; export const REVERT_WORKFLOW_SUCCESS = 'REVERT_WORKFLOW_SUCCESS'; export const REVERT_WORKFLOW_FAILURE = 'REVERT_WORKFLOW_FAILURE'; +export const LOAD_INGESTION_STATUS = 'LOAD_INGESTION_STATUS'; +export const LOAD_INGESTION_STATUS_SUCCESS = 'LOAD_INGESTION_STATUS_SUCCESS'; +export const LOAD_INGESTION_STATUS_FAILURE = 'LOAD_INGESTION_STATUS_FAILURE'; + export class InitializeWorkflows implements Action { readonly type = INITIALIZE_WORKFLOWS; } @@ -351,6 +356,20 @@ export class RevertWorkflowFailure implements Action { readonly type = REVERT_WORKFLOW_FAILURE; } +export class LoadIngestionStatus implements Action { + readonly type = LOAD_INGESTION_STATUS; + constructor(public payload: number) {} +} + +export class LoadIngestionStatusSuccess implements Action { + readonly type = LOAD_INGESTION_STATUS_SUCCESS; + constructor(public payload: IngestionStatusModel[]) {} +} + +export class LoadIngestionStatusFailure implements Action { + readonly type = LOAD_INGESTION_STATUS_FAILURE; +} + export type WorkflowsActions = | InitializeWorkflows | InitializeWorkflowsSuccess @@ -404,4 +423,7 @@ export type WorkflowsActions = | ImportWorkflowsFailure | RevertWorkflow | RevertWorkflowSuccess - | RevertWorkflowFailure; + | RevertWorkflowFailure + | LoadIngestionStatus + | LoadIngestionStatusSuccess + | LoadIngestionStatusFailure; diff --git a/ui/src/app/stores/workflows/workflows.effects.spec.ts b/ui/src/app/stores/workflows/workflows.effects.spec.ts index 84aa0932b..15776d9ad 100644 --- a/ui/src/app/stores/workflows/workflows.effects.spec.ts +++ b/ui/src/app/stores/workflows/workflows.effects.spec.ts @@ -37,6 +37,7 @@ import { RunWorkflows, RevertWorkflow, SearchWorkflows, + LoadIngestionStatus, } from './workflows.actions'; import { WorkflowsEffects } from './workflows.effects'; @@ -67,12 +68,15 @@ import { SparkTemplateParametersModel } from '../../models/jobTemplateParameters import * as WorkflowActions from './workflows.actions'; import { TableSearchRequestModelFactory } from '../../models/search/tableSearchRequest.model'; import { TableSearchResponseModel } from '../../models/search/tableSearchResponse.model'; +import { HyperdriveService } from '../../services/hyperdrive/hyperdrive.service'; +import { IngestionStatusModel, IngestionStatusModelFactory, TopicModel, TopicModelFactory } from '../../models/ingestionStatus.model'; describe('WorkflowsEffects', () => { let underTest: WorkflowsEffects; let workflowService: WorkflowService; let workflowHistoryService: WorkflowHistoryService; let jobService: JobService; + let hyperdriveService: HyperdriveService; let mockActions: Observable; let mockStore: MockStore; let toastrService: ToastrService; @@ -106,6 +110,7 @@ describe('WorkflowsEffects', () => { underTest = TestBed.inject(WorkflowsEffects); workflowService = TestBed.inject(WorkflowService); workflowHistoryService = TestBed.inject(WorkflowHistoryService); + hyperdriveService = TestBed.inject(HyperdriveService); jobService = TestBed.inject(JobService); mockActions = TestBed.inject(Actions); mockStore = TestBed.inject(MockStore); @@ -1223,4 +1228,42 @@ describe('WorkflowsEffects', () => { expect(result).toEqual(inputWorkflow); }); }); + + describe('statusIngestionLoad', () => { + it('should load ingestion status', () => { + const payload = 1; + const response = [IngestionStatusModelFactory.create('jobName', 'jobType', TopicModelFactory.create('topic', 10))]; + + const action = new LoadIngestionStatus(payload); + mockActions = cold('-a', { a: action }); + const getIngestionStatusResponse = cold('-a|', { a: response }); + const expected = cold('--a', { + a: { + type: WorkflowActions.LOAD_INGESTION_STATUS_SUCCESS, + payload: response, + }, + }); + + spyOn(hyperdriveService, 'getIngestionStatus').and.returnValue(getIngestionStatusResponse); + + expect(underTest.statusIngestionLoad).toBeObservable(expected); + }); + + it('should catch failure when service fails to load ingestion status', () => { + const payload = 1; + + const action = new LoadIngestionStatus(payload); + mockActions = cold('-a', { a: action }); + + const getIngestionStatusResponse = cold('-#|'); + spyOn(hyperdriveService, 'getIngestionStatus').and.returnValue(getIngestionStatusResponse); + + const expected = cold('--a', { + a: { + type: WorkflowActions.LOAD_INGESTION_STATUS_FAILURE, + }, + }); + expect(underTest.statusIngestionLoad).toBeObservable(expected); + }); + }); }); diff --git a/ui/src/app/stores/workflows/workflows.effects.ts b/ui/src/app/stores/workflows/workflows.effects.ts index 09b08ddcb..4874b7e54 100644 --- a/ui/src/app/stores/workflows/workflows.effects.ts +++ b/ui/src/app/stores/workflows/workflows.effects.ts @@ -42,6 +42,8 @@ import groupBy from 'lodash-es/groupBy'; import { ApiUtil } from '../../utils/api/api.util'; import { JobTemplateModel } from '../../models/jobTemplate.model'; import { TableSearchResponseModel } from '../../models/search/tableSearchResponse.model'; +import { HyperdriveService } from '../../services/hyperdrive/hyperdrive.service'; +import { IngestionStatusModel } from '../../models/ingestionStatus.model'; @Injectable() export class WorkflowsEffects { @@ -50,6 +52,7 @@ export class WorkflowsEffects { private workflowService: WorkflowService, private workflowHistoryService: WorkflowHistoryService, private jobService: JobService, + private hyperdriveService: HyperdriveService, private store: Store, private router: Router, private toastrService: ToastrService, @@ -697,6 +700,31 @@ export class WorkflowsEffects { ); }); + statusIngestionLoad = createEffect(() => { + return this.actions.pipe( + ofType(WorkflowActions.LOAD_INGESTION_STATUS), + switchMap((action: WorkflowActions.LoadIngestionStatus) => { + return this.hyperdriveService.getIngestionStatus(action.payload).pipe( + mergeMap((ingestionStatus: IngestionStatusModel[]) => { + return [ + { + type: WorkflowActions.LOAD_INGESTION_STATUS_SUCCESS, + payload: ingestionStatus, + }, + ]; + }), + catchError(() => { + return [ + { + type: WorkflowActions.LOAD_INGESTION_STATUS_FAILURE, + }, + ]; + }), + ); + }), + ); + }); + sortJobsInWorkflow(workflow: WorkflowJoinedModel): WorkflowJoinedModel { const sortedJobs = workflow.dagDefinitionJoined.jobDefinitions.sort((jobLeft, jobRight) => jobLeft.order - jobRight.order); return { ...workflow, dagDefinitionJoined: { ...workflow.dagDefinitionJoined, jobDefinitions: sortedJobs } }; diff --git a/ui/src/app/stores/workflows/workflows.reducers.ts b/ui/src/app/stores/workflows/workflows.reducers.ts index 1178e4185..801afd886 100644 --- a/ui/src/app/stores/workflows/workflows.reducers.ts +++ b/ui/src/app/stores/workflows/workflows.reducers.ts @@ -22,6 +22,7 @@ import { workflowModes } from '../../models/enums/workflowModes.constants'; import { JobTemplateModel } from '../../models/jobTemplate.model'; import { WorkflowModel } from '../../models/workflow.model'; import { TableSearchRequestModel } from '../../models/search/tableSearchRequest.model'; +import { IngestionStatusModel } from '../../models/ingestionStatus.model'; export interface State { workflowsSearch: { @@ -59,6 +60,8 @@ export interface State { jobs: JobForRunModel[]; isSuccessfullyLoaded: boolean; }; + ingestionStatus: IngestionStatusModel[]; + ingestionStatusLoading: boolean; } const initialState: State = { @@ -97,6 +100,8 @@ const initialState: State = { jobs: undefined, isSuccessfullyLoaded: false, }, + ingestionStatusLoading: true, + ingestionStatus: [], }; export function workflowsReducer(state: State = initialState, action: WorkflowsActions.WorkflowsActions) { @@ -594,6 +599,24 @@ export function workflowsReducer(state: State = initialState, action: WorkflowsA loading: false, }, }; + case WorkflowsActions.LOAD_INGESTION_STATUS: + return { + ...state, + ingestionStatusLoading: true, + ingestionStatus: [], + }; + case WorkflowsActions.LOAD_INGESTION_STATUS_SUCCESS: + return { + ...state, + ingestionStatusLoading: false, + ingestionStatus: action.payload, + }; + case WorkflowsActions.LOAD_INGESTION_STATUS_FAILURE: + return { + ...state, + ingestionStatusLoading: false, + ingestionStatus: [], + }; default: return state; }