diff --git a/README.md b/README.md index a62ade20e..169ec501f 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,24 @@ The Hadoop configuration directory needs to be added as the environment variable - To add the Hadoop configuration directory to the application classpath, in the file `/conf/catalina.properties`, append to the key `shared.loader` the hadoop conf dir, e.g. `shared.loader="/opt/hadoop"`. +### Symbolic links on user-defined files +With [Feature #700: Skip dag instance creation if no new message is available in Kafka](https://github.com/AbsaOSS/hyperdrive-trigger/issues/700), +the application needs to access files that are defined in job templates for Hyperdrive jobs. Especially, it will need +to access any files specified under `reader.option.kafka` to configure Kafka consumers, e.g. keystore, truststore +and keytabs under the same path as the Spark job would see them. + +For example, a (resolved) job template may include +- Additional files: `/etc/config/keystore.jks#keystore.jks` +- App arguments: `reader.option.kafka.ssl.keystore.location=keystore.jks` + +In this case, obviously `/etc/config/keystore.jks` needs to exist to submit the job, but additionally, +`/keystore.jks` needs to exist such that the web application can access the file under the same path +as the Spark job would, in order to be able to create a Kafka consumer using the same configuration as the Spark job. This +may obviously be achieved using symbolic links. + +For access to HDFS, `spark.yarn.keytab` and `spark.yarn.principal` from the application properties are used for authentication. +No symbolic links are required. + ## Embedded Tomcat For development purposes, hyperdrive-trigger can be executed as an application with an embedded tomcat. Please check out branch **feature/embedded-tomcat-2** to use it. diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala index 7b2742804..1a250c79d 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala @@ -15,10 +15,10 @@ package za.co.absa.hyperdrive.trigger.api.rest.services -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.security.UserGroupInformation +import org.json4s.jackson.Serialization +import org.json4s.{Formats, NoTypeHints} import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap @@ -44,9 +44,9 @@ class HdfsParameters( @Service class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends CheckpointService { private val logger = LoggerFactory.getLogger(this.getClass) - private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) private val offsetsDirName = "offsets" private val commitsDirName = "commits" + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** * See org.apache.spark.sql.execution.streaming.HDFSMetadataLog @@ -105,7 +105,7 @@ class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends Checkpo val SERIALIZED_VOID_OFFSET = "-" def parseOffset(value: String): Option[TopicPartitionOffsets] = value match { case SERIALIZED_VOID_OFFSET => None - case json => Some(mapper.readValue(json, classOf[TopicPartitionOffsets])) + case json => Some(Serialization.read[TopicPartitionOffsets](json)) } if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala index 8b7bed734..c46823718 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala @@ -93,7 +93,14 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig, getCheckpointOffsets(jobParameters, kafkaParametersOpt).map { case Some(checkpointOffsets) => val allConsumed = offsetsConsumed(checkpointOffsets, kafkaEndOffsets) - logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance") + if (allConsumed) { + logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance") + } else { + logger.debug( + s"Some offsets haven't been consumed yet for topic ${kafkaParametersOpt.get._1}. Kafka offsets: ${kafkaEndOffsets}, " + + s"Checkpoint offsets: ${checkpointOffsets}" + ) + } !allConsumed case _ => true } diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala index cb4e76a1e..92b790d7e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala @@ -19,6 +19,8 @@ import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{SparkExtraJavaOptions, SparkTags} import za.co.absa.hyperdrive.trigger.models._ import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.{SparkConfigList, SparkConfigMap} +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes.JobType import scala.util.{Failure, Success, Try} @@ -99,6 +101,7 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService { templateParams: SparkTemplateParameters ): SparkInstanceParameters = SparkInstanceParameters( + jobType = mergeSparkJobType(definitionParams.jobType, templateParams.jobType), jobJar = mergeOptionString(definitionParams.jobJar, templateParams.jobJar), mainClass = mergeOptionString(definitionParams.mainClass, templateParams.mainClass), appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments), @@ -111,6 +114,14 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService { ).toAdditionalSparkConfigList ) + private def mergeSparkJobType(definitionJobType: JobType, templateJobType: JobType) = { + if (definitionJobType == templateJobType) { + definitionJobType + } else { + JobTypes.Spark + } + } + private def mergeShellParameters( definitionParams: ShellDefinitionParameters, templateParams: ShellTemplateParameters diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala index 2a8f1fde4..b2f2ee3ef 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala @@ -36,6 +36,7 @@ trait KafkaService { @Service class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService { private val logger = LoggerFactory.getLogger(this.getClass) + private val consumerUuid = randomUUID().toString private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]]( generalConfig.kafkaConsumersCacheSize, createKafkaConsumer @@ -58,7 +59,7 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer } private def getOffsets(topic: String, properties: Properties, offsetFn: OffsetFunction): Map[Int, Long] = { - val groupId = s"hyperdrive-trigger-kafkaService-${randomUUID().toString}" + val groupId = s"hyperdrive-trigger-kafkaService-$consumerUuid" properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) val consumer = kafkaConsumersCache.get((properties, Thread.currentThread().getId)) val partitionInfo = Option(consumer.partitionsFor(topic)).map(_.asScala).getOrElse(Seq()) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala index 8eeabea23..627613e49 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala @@ -57,5 +57,7 @@ class Sensors( class Executors( @NotNull @Name("thread.pool.size") - val threadPoolSize: Int + val threadPoolSize: Int, + @DefaultValue(Array("true")) + val enableHyperdriveExecutor: Boolean ) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala index 9ae317a43..496f5991e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala @@ -37,6 +37,7 @@ object JobStatuses { case object InvalidExecutor extends JobStatus("InvalidExecutor", true, true, false) case object FailedPreviousJob extends JobStatus("FailedPreviousJob", true, true, false) case object Skipped extends JobStatus("Skipped", true, false, false) + case object NoData extends JobStatus("No Data", true, false, false) val statuses: Set[JobStatus] = Set( InQueue, @@ -49,7 +50,8 @@ object JobStatuses { SubmissionTimeout, InvalidExecutor, FailedPreviousJob, - Skipped + Skipped, + NoData ) val finalStatuses: Set[JobStatus] = statuses.filter(!_.isFinalStatus) val nonFinalStatuses: Set[JobStatus] = statuses.filter(_.isFinalStatus) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 50f7eb508..cecd49394 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -20,9 +20,10 @@ import java.util.concurrent import javax.inject.Inject import za.co.absa.hyperdrive.trigger.models.{DagInstance, JobInstance, ShellInstanceParameters, SparkInstanceParameters} import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InvalidExecutor -import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses} +import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses, JobTypes} import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, JobInstanceRepository} import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{ + HyperdriveExecutor, SparkClusterService, SparkEmrClusterServiceImpl, SparkExecutor, @@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory import org.springframework.beans.factory.BeanFactory import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor import org.springframework.stereotype.Component +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig} import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender @@ -45,7 +47,8 @@ class Executors @Inject() ( notificationSender: NotificationSender, beanFactory: BeanFactory, implicit val sparkConfig: SparkConfig, - schedulerConfig: SchedulerConfig + schedulerConfig: SchedulerConfig, + hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService ) { private val logger = LoggerFactory.getLogger(this.getClass) private implicit val executionContext: ExecutionContextExecutor = @@ -79,6 +82,19 @@ class Executors @Inject() ( case _ => } fut + case jobInstances + if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && ji.jobStatus == JobStatuses.NoData) => + val updatedDagInstance = + dagInstance.copy(status = DagInstanceStatuses.Skipped, finished = Option(LocalDateTime.now())) + val fut = for { + _ <- dagInstanceRepository.update(updatedDagInstance) + } yield {} + fut.onComplete { + case Failure(exception) => + logger.error(s"Updating status failed for skipped run. Dag instance id = ${dagInstance.id}", exception) + case _ => + } + fut case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && !ji.jobStatus.isFailed) => val updatedDagInstance = dagInstance.copy(status = DagInstanceStatuses.Succeeded, finished = Option(LocalDateTime.now())) @@ -98,6 +114,10 @@ class Executors @Inject() ( jobInstance match { case Some(ji) => ji.jobParameters match { + case hyperdrive: SparkInstanceParameters + if hyperdrive.jobType == JobTypes.Hyperdrive && useHyperExecutor(hyperdrive) => + HyperdriveExecutor + .execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService) case spark: SparkInstanceParameters => SparkExecutor.execute(ji, spark, updateJob, sparkClusterService) case shell: ShellInstanceParameters => ShellExecutor.execute(ji, shell, updateJob) case _ => updateJob(ji.copy(jobStatus = InvalidExecutor)) @@ -115,6 +135,12 @@ class Executors @Inject() ( fut } + private def useHyperExecutor(parameters: SparkInstanceParameters) = { + schedulerConfig.executors.enableHyperdriveExecutor && + parameters.jobType == JobTypes.Hyperdrive && + parameters.appArguments.contains("useHyperdriveExecutor=true") + } + private def updateJob(jobInstance: JobInstance): Future[Unit] = { logger.info( s"Job updated. ID = ${jobInstance.id} STATUS = ${jobInstance.jobStatus} EXECUTOR_ID = ${jobInstance.executorJobId}" diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala new file mode 100644 index 000000000..0b936e9f9 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.scheduler.executors.spark + +import org.slf4j.LoggerFactory +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig +import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses +import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} + +import scala.concurrent.{ExecutionContext, Future} + +object HyperdriveExecutor { + private val logger = LoggerFactory.getLogger(this.getClass) + + def execute( + jobInstance: JobInstance, + jobParameters: SparkInstanceParameters, + updateJob: JobInstance => Future[Unit], + sparkClusterService: SparkClusterService, + offsetComparisonService: HyperdriveOffsetComparisonService + )(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] = + jobInstance.executorJobId match { + case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob) + case Some(executorJobId) => + SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) + } + + private def submitJob(sparkClusterService: SparkClusterService, + offsetComparisonService: HyperdriveOffsetComparisonService, + jobInstance: JobInstance, + jobParameters: SparkInstanceParameters, + updateJob: JobInstance => Future[Unit] + )(implicit executionContext: ExecutionContext) = { + logger.debug("Using HyperdriveExecutor") + for { + newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters) + _ <- + if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) + else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) + } yield () + } +} diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala index 66ecf1be3..7a71e39a3 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala @@ -37,7 +37,7 @@ object SparkExecutor { case Some(executorJobId) => updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) } - private def updateJobStatus( + private[spark] def updateJobStatus( executorJobId: String, jobInstance: JobInstance, updateJob: JobInstance => Future[Unit], diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala index 8f4a49ba3..e81567158 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala @@ -72,9 +72,9 @@ class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter w result.size shouldBe 2 result.head._1 shouldBe "my.topic" - result.head._2 should contain theSameElementsAs Map("2" -> 2021, "1" -> 1021, "3" -> 3021, "0" -> 21) + result.head._2 should contain theSameElementsAs Map(2 -> 2021L, 1 -> 1021L, 3 -> 3021L, 0 -> 21L) result.toSeq(1)._1 shouldBe "my.other.topic" - result.toSeq(1)._2 should contain theSameElementsAs Map("0" -> 0) + result.toSeq(1)._2 should contain theSameElementsAs Map(0 -> 0L) } "getLatestOffsetFile" should "get the latest offset file, and it is committed" in { diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala index 8daead0db..dc4f8c597 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala @@ -282,7 +282,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers { ) // then - templateDefined.head.jobParameters.jobType shouldBe JobTypes.Spark + templateDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive templateDefined.head.jobParameters .asInstanceOf[SparkInstanceParameters] .jobJar shouldBe sparkTemplateParametersDefined.jobJar @@ -302,7 +302,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers { .asInstanceOf[SparkInstanceParameters] .additionalSparkConfig should contain theSameElementsAs sparkTemplateParametersDefined.additionalSparkConfig - bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Spark + bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive bothScriptsDefined.head.jobParameters .asInstanceOf[SparkInstanceParameters] .jobJar shouldBe sparkTemplateParametersDefined.jobJar diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala index 8873c22b9..10ce845a2 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala @@ -33,6 +33,6 @@ object TestSchedulerConfig { autostart, lagThreshold, new Sensors(sensorsThreadPoolSize, sensorsChangedSensorsChunkQuerySize), - new Executors(executorsThreadPoolSize) + new Executors(executorsThreadPoolSize, true) ) } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala new file mode 100644 index 000000000..46c629af5 --- /dev/null +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala @@ -0,0 +1,110 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.scheduler.executors.spark + +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{never, reset, verify, when} +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers} +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.configuration.application.{DefaultTestSparkConfig, SparkConfig} +import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InQueue +import za.co.absa.hyperdrive.trigger.models.enums.{JobStatuses, JobTypes} +import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} + +import java.time.LocalDateTime +import scala.concurrent.Future + +class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with BeforeAndAfter with Matchers { + private val offsetComparisonServiceMock = mock[HyperdriveOffsetComparisonService] + private val sparkClusterServiceMock = mock[SparkClusterService] + private val updateJobStub: JobInstance => Future[Unit] = mock[JobInstance => Future[Unit]] + + before { + reset(offsetComparisonServiceMock) + reset(sparkClusterServiceMock) + reset(updateJobStub) + } + + it should "submit a job if it is required" in { + val jobInstance = getJobInstance + val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] + + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { true }) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) + + implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn + val resultFut = HyperdriveExecutor.execute(jobInstance, + jobInstanceParameters, + updateJobStub, + sparkClusterServiceMock, + offsetComparisonServiceMock + ) + + resultFut.map { _ => + verify(sparkClusterServiceMock).submitJob(any(), any(), any()) + verify(updateJobStub, never()).apply(any()) + succeed + } + } + + it should "not submit a job if it is not required" in { + val jobInstance = getJobInstance + val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] + + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { false }) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) + + implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn + val resultFut = HyperdriveExecutor.execute(jobInstance, + jobInstanceParameters, + updateJobStub, + sparkClusterServiceMock, + offsetComparisonServiceMock + ) + + resultFut.map { _ => + verify(sparkClusterServiceMock, never()).submitJob(any(), any(), any()) + val jiCaptor: ArgumentCaptor[JobInstance] = ArgumentCaptor.forClass(classOf[JobInstance]) + verify(updateJobStub).apply(jiCaptor.capture()) + jiCaptor.getValue.jobStatus shouldBe JobStatuses.NoData + } + } + + private def getJobInstance = { + val jobInstanceParameters = SparkInstanceParameters( + jobType = JobTypes.Hyperdrive, + jobJar = "job.jar", + mainClass = "mainClass", + appArguments = List() + ) + JobInstance( + jobName = "jobName", + jobParameters = jobInstanceParameters, + jobStatus = InQueue, + executorJobId = None, + applicationId = None, + stepId = None, + created = LocalDateTime.now(), + updated = None, + order = 0, + dagInstanceId = 0 + ) + } +} diff --git a/ui/src/app/components/runs/run-detail/run-detail.component.html b/ui/src/app/components/runs/run-detail/run-detail.component.html index 964db984c..ab02f5994 100644 --- a/ui/src/app/components/runs/run-detail/run-detail.component.html +++ b/ui/src/app/components/runs/run-detail/run-detail.component.html @@ -45,6 +45,7 @@ + {{jobInstance.jobStatus.name}} diff --git a/ui/src/app/models/enums/jobStatuses.constants.ts b/ui/src/app/models/enums/jobStatuses.constants.ts index 1e0a4ad57..9a738ed9d 100644 --- a/ui/src/app/models/enums/jobStatuses.constants.ts +++ b/ui/src/app/models/enums/jobStatuses.constants.ts @@ -21,4 +21,5 @@ export const jobStatuses = { FAILED: 'Failed', SKIPPED: 'Skipped', SUBMISSION_TIMEOUT: 'Submission timeout', + NO_DATA: 'No Data', };