From f190179510f82c305de03c3564640ed61a2e9d8d Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 14 Jul 2022 11:36:33 +0200 Subject: [PATCH 01/12] wip --- .../executors/spark/HyperdriveExecutor.scala | 41 +++++++++++++++++++ .../executors/spark/SparkExecutor.scala | 2 +- 2 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala new file mode 100644 index 000000000..2de0bf790 --- /dev/null +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.scheduler.executors.spark + +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig +import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} + +import scala.concurrent.{ExecutionContext, Future} + +object HyperdriveExecutor { + def execute( + jobInstance: JobInstance, + jobParameters: SparkInstanceParameters, + updateJob: JobInstance => Future[Unit], + sparkClusterService: SparkClusterService, + offsetComparisonService: HyperdriveOffsetComparisonService + )(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] = + jobInstance.executorJobId match { + case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob) + case Some(executorJobId) => SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) + } + + private def submitJob(sparkClusterService: SparkClusterService, offsetComparisonService: HyperdriveOffsetComparisonService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit]) = { + offsetComparisonService.isNewJobInstanceRequired() + sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) + } +} diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala index 66ecf1be3..7a71e39a3 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkExecutor.scala @@ -37,7 +37,7 @@ object SparkExecutor { case Some(executorJobId) => updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) } - private def updateJobStatus( + private[spark] def updateJobStatus( executorJobId: String, jobInstance: JobInstance, updateJob: JobInstance => Future[Unit], From 5e18d36c520c99e850f159e35b383504b6004b7e Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 14 Jul 2022 13:35:19 +0200 Subject: [PATCH 02/12] wip --- .../trigger/models/enums/JobStatuses.scala | 4 +++- .../trigger/scheduler/executors/Executors.scala | 14 ++++++-------- .../executors/spark/HyperdriveExecutor.scala | 11 ++++++++--- .../runs/run-detail/run-detail.component.html | 1 + ui/src/app/models/enums/jobStatuses.constants.ts | 1 + 5 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala index 9ae317a43..496f5991e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/models/enums/JobStatuses.scala @@ -37,6 +37,7 @@ object JobStatuses { case object InvalidExecutor extends JobStatus("InvalidExecutor", true, true, false) case object FailedPreviousJob extends JobStatus("FailedPreviousJob", true, true, false) case object Skipped extends JobStatus("Skipped", true, false, false) + case object NoData extends JobStatus("No Data", true, false, false) val statuses: Set[JobStatus] = Set( InQueue, @@ -49,7 +50,8 @@ object JobStatuses { SubmissionTimeout, InvalidExecutor, FailedPreviousJob, - Skipped + Skipped, + NoData ) val finalStatuses: Set[JobStatus] = statuses.filter(!_.isFinalStatus) val nonFinalStatuses: Set[JobStatus] = statuses.filter(_.isFinalStatus) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 50f7eb508..f5463fe8b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -20,18 +20,14 @@ import java.util.concurrent import javax.inject.Inject import za.co.absa.hyperdrive.trigger.models.{DagInstance, JobInstance, ShellInstanceParameters, SparkInstanceParameters} import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InvalidExecutor -import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses} +import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses, JobTypes} import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, JobInstanceRepository} -import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{ - SparkClusterService, - SparkEmrClusterServiceImpl, - SparkExecutor, - SparkYarnClusterServiceImpl -} +import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{HyperdriveExecutor, SparkClusterService, SparkEmrClusterServiceImpl, SparkExecutor, SparkYarnClusterServiceImpl} import org.slf4j.LoggerFactory import org.springframework.beans.factory.BeanFactory import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor import org.springframework.stereotype.Component +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig} import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender @@ -45,7 +41,8 @@ class Executors @Inject() ( notificationSender: NotificationSender, beanFactory: BeanFactory, implicit val sparkConfig: SparkConfig, - schedulerConfig: SchedulerConfig + schedulerConfig: SchedulerConfig, + hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService, ) { private val logger = LoggerFactory.getLogger(this.getClass) private implicit val executionContext: ExecutionContextExecutor = @@ -98,6 +95,7 @@ class Executors @Inject() ( jobInstance match { case Some(ji) => ji.jobParameters match { + case hyperdrive: SparkInstanceParameters if hyperdrive.jobType == JobTypes.Hyperdrive => HyperdriveExecutor.execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService) case spark: SparkInstanceParameters => SparkExecutor.execute(ji, spark, updateJob, sparkClusterService) case shell: ShellInstanceParameters => ShellExecutor.execute(ji, shell, updateJob) case _ => updateJob(ji.copy(jobStatus = InvalidExecutor)) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index 2de0bf790..68ecec987 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -17,7 +17,9 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig +import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} +import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.HyperdriveExecutor.submitJob import scala.concurrent.{ExecutionContext, Future} @@ -34,8 +36,11 @@ object HyperdriveExecutor { case Some(executorJobId) => SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) } - private def submitJob(sparkClusterService: SparkClusterService, offsetComparisonService: HyperdriveOffsetComparisonService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit]) = { - offsetComparisonService.isNewJobInstanceRequired() - sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) + private def submitJob(sparkClusterService: SparkClusterService, offsetComparisonService: HyperdriveOffsetComparisonService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit])(implicit executionContext: ExecutionContext) = { + for { + newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobInstance.jobParameters) + _ <- sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) if newJobRequired + _ <- updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) if !newJobRequired + } yield () } } diff --git a/ui/src/app/components/runs/run-detail/run-detail.component.html b/ui/src/app/components/runs/run-detail/run-detail.component.html index 964db984c..2fd74107a 100644 --- a/ui/src/app/components/runs/run-detail/run-detail.component.html +++ b/ui/src/app/components/runs/run-detail/run-detail.component.html @@ -45,6 +45,7 @@ + {{jobInstance.jobStatus.name}} diff --git a/ui/src/app/models/enums/jobStatuses.constants.ts b/ui/src/app/models/enums/jobStatuses.constants.ts index 1e0a4ad57..9a738ed9d 100644 --- a/ui/src/app/models/enums/jobStatuses.constants.ts +++ b/ui/src/app/models/enums/jobStatuses.constants.ts @@ -21,4 +21,5 @@ export const jobStatuses = { FAILED: 'Failed', SKIPPED: 'Skipped', SUBMISSION_TIMEOUT: 'Submission timeout', + NO_DATA: 'No Data', }; From 0df44bf0268b550afff6b3c614ab0e4ccfbf20d8 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Mon, 18 Jul 2022 11:15:05 +0200 Subject: [PATCH 03/12] Add HyperdriveExecutor --- .../executors/spark/HyperdriveExecutor.scala | 6 +- .../spark/HyperdriveExecutorTest.scala | 101 ++++++++++++++++++ 2 files changed, 104 insertions(+), 3 deletions(-) create mode 100644 src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index 68ecec987..bf0190754 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -38,9 +38,9 @@ object HyperdriveExecutor { private def submitJob(sparkClusterService: SparkClusterService, offsetComparisonService: HyperdriveOffsetComparisonService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit])(implicit executionContext: ExecutionContext) = { for { - newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobInstance.jobParameters) - _ <- sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) if newJobRequired - _ <- updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) if !newJobRequired + newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters) + _ <- if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) + else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) } yield () } } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala new file mode 100644 index 000000000..cb8ea967d --- /dev/null +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala @@ -0,0 +1,101 @@ + +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.hyperdrive.trigger.scheduler.executors.spark + +import org.mockito.ArgumentCaptor +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito.{never, reset, verify, when} +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{AsyncFlatSpec, BeforeAndAfter, Matchers} +import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService +import za.co.absa.hyperdrive.trigger.configuration.application.{DefaultTestSparkConfig, SparkConfig} +import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InQueue +import za.co.absa.hyperdrive.trigger.models.enums.{JobStatuses, JobTypes} +import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} + +import java.time.LocalDateTime +import scala.concurrent.Future + +class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with BeforeAndAfter with Matchers { + private val offsetComparisonServiceMock = mock[HyperdriveOffsetComparisonService] + private val sparkClusterServiceMock = mock[SparkClusterService] + private val updateJobStub: JobInstance => Future[Unit] = mock[JobInstance => Future[Unit]] + + before { + reset(offsetComparisonServiceMock) + reset(sparkClusterServiceMock) + reset(updateJobStub) + } + + it should "submit a job if it is required" in { + val jobInstance = getJobInstance + val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] + + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future{true}) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future {(): Unit}) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future {(): Unit}) + + implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn + val resultFut = HyperdriveExecutor.execute(jobInstance, jobInstanceParameters, updateJobStub, sparkClusterServiceMock, offsetComparisonServiceMock) + + resultFut.map { _ => + verify(sparkClusterServiceMock).submitJob(any(), any(), any()) + verify(updateJobStub, never()).apply(any()) + succeed + } + } + + it should "not submit a job if it is not required" in { + val jobInstance = getJobInstance + val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] + + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future{false}) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future {(): Unit}) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future {(): Unit}) + + implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn + val resultFut = HyperdriveExecutor.execute(jobInstance, jobInstanceParameters, updateJobStub, sparkClusterServiceMock, offsetComparisonServiceMock) + + resultFut.map { _ => + verify(sparkClusterServiceMock, never()).submitJob(any(), any(), any()) + val jiCaptor: ArgumentCaptor[JobInstance] = ArgumentCaptor.forClass(classOf[JobInstance]) + verify(updateJobStub).apply(jiCaptor.capture()) + jiCaptor.getValue.jobStatus shouldBe JobStatuses.NoData + } + } + + private def getJobInstance = { + val jobInstanceParameters = SparkInstanceParameters( + jobType = JobTypes.Hyperdrive, + jobJar = "job.jar", + mainClass = "mainClass", + appArguments = List() + ) + JobInstance( + jobName = "jobName", + jobParameters = jobInstanceParameters, + jobStatus = InQueue, + executorJobId = None, + applicationId = None, + stepId = None, + created = LocalDateTime.now(), + updated = None, + order = 0, + dagInstanceId = 0 + ) + } +} From d47648c4b3040bfe9645b29ddddb746600e961ad Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Mon, 18 Jul 2022 16:18:42 +0200 Subject: [PATCH 04/12] Keep Hyperdrive Job Type in Job Instance --- .../rest/services/JobTemplateResolutionService.scala | 11 +++++++++++ .../executors/spark/HyperdriveExecutor.scala | 1 - .../rest/utils/JobTemplateResolutionServiceTest.scala | 4 ++-- .../runs/run-detail/run-detail.component.html | 2 +- 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala index cb4e76a1e..92b790d7e 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/JobTemplateResolutionService.scala @@ -19,6 +19,8 @@ import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{SparkExtraJavaOptions, SparkTags} import za.co.absa.hyperdrive.trigger.models._ import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.{SparkConfigList, SparkConfigMap} +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes +import za.co.absa.hyperdrive.trigger.models.enums.JobTypes.JobType import scala.util.{Failure, Success, Try} @@ -99,6 +101,7 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService { templateParams: SparkTemplateParameters ): SparkInstanceParameters = SparkInstanceParameters( + jobType = mergeSparkJobType(definitionParams.jobType, templateParams.jobType), jobJar = mergeOptionString(definitionParams.jobJar, templateParams.jobJar), mainClass = mergeOptionString(definitionParams.mainClass, templateParams.mainClass), appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments), @@ -111,6 +114,14 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService { ).toAdditionalSparkConfigList ) + private def mergeSparkJobType(definitionJobType: JobType, templateJobType: JobType) = { + if (definitionJobType == templateJobType) { + definitionJobType + } else { + JobTypes.Spark + } + } + private def mergeShellParameters( definitionParams: ShellDefinitionParameters, templateParams: ShellTemplateParameters diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index bf0190754..ce8a0ce50 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -19,7 +19,6 @@ import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetCompariso import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters} -import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.HyperdriveExecutor.submitJob import scala.concurrent.{ExecutionContext, Future} diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala index 8daead0db..dc4f8c597 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/utils/JobTemplateResolutionServiceTest.scala @@ -282,7 +282,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers { ) // then - templateDefined.head.jobParameters.jobType shouldBe JobTypes.Spark + templateDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive templateDefined.head.jobParameters .asInstanceOf[SparkInstanceParameters] .jobJar shouldBe sparkTemplateParametersDefined.jobJar @@ -302,7 +302,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers { .asInstanceOf[SparkInstanceParameters] .additionalSparkConfig should contain theSameElementsAs sparkTemplateParametersDefined.additionalSparkConfig - bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Spark + bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive bothScriptsDefined.head.jobParameters .asInstanceOf[SparkInstanceParameters] .jobJar shouldBe sparkTemplateParametersDefined.jobJar diff --git a/ui/src/app/components/runs/run-detail/run-detail.component.html b/ui/src/app/components/runs/run-detail/run-detail.component.html index 2fd74107a..ab02f5994 100644 --- a/ui/src/app/components/runs/run-detail/run-detail.component.html +++ b/ui/src/app/components/runs/run-detail/run-detail.component.html @@ -45,7 +45,7 @@ - + {{jobInstance.jobStatus.name}} From d7f3f0922a298d41b445a99995052f5192998237 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 21 Jul 2022 15:02:44 +0200 Subject: [PATCH 05/12] Fix formatting --- .../scheduler/executors/Executors.scala | 14 +++++++--- .../executors/spark/HyperdriveExecutor.scala | 17 ++++++++---- .../spark/HyperdriveExecutorTest.scala | 27 ++++++++++++------- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index f5463fe8b..8cac177ea 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -22,7 +22,13 @@ import za.co.absa.hyperdrive.trigger.models.{DagInstance, JobInstance, ShellInst import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InvalidExecutor import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses, JobTypes} import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, JobInstanceRepository} -import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{HyperdriveExecutor, SparkClusterService, SparkEmrClusterServiceImpl, SparkExecutor, SparkYarnClusterServiceImpl} +import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{ + HyperdriveExecutor, + SparkClusterService, + SparkEmrClusterServiceImpl, + SparkExecutor, + SparkYarnClusterServiceImpl +} import org.slf4j.LoggerFactory import org.springframework.beans.factory.BeanFactory import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor @@ -42,7 +48,7 @@ class Executors @Inject() ( beanFactory: BeanFactory, implicit val sparkConfig: SparkConfig, schedulerConfig: SchedulerConfig, - hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService, + hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService ) { private val logger = LoggerFactory.getLogger(this.getClass) private implicit val executionContext: ExecutionContextExecutor = @@ -95,7 +101,9 @@ class Executors @Inject() ( jobInstance match { case Some(ji) => ji.jobParameters match { - case hyperdrive: SparkInstanceParameters if hyperdrive.jobType == JobTypes.Hyperdrive => HyperdriveExecutor.execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService) + case hyperdrive: SparkInstanceParameters if hyperdrive.jobType == JobTypes.Hyperdrive => + HyperdriveExecutor + .execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService) case spark: SparkInstanceParameters => SparkExecutor.execute(ji, spark, updateJob, sparkClusterService) case shell: ShellInstanceParameters => ShellExecutor.execute(ji, shell, updateJob) case _ => updateJob(ji.copy(jobStatus = InvalidExecutor)) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index ce8a0ce50..d255c4018 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -31,15 +31,22 @@ object HyperdriveExecutor { offsetComparisonService: HyperdriveOffsetComparisonService )(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] = jobInstance.executorJobId match { - case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob) - case Some(executorJobId) => SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) + case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob) + case Some(executorJobId) => + SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService) } - private def submitJob(sparkClusterService: SparkClusterService, offsetComparisonService: HyperdriveOffsetComparisonService, jobInstance: JobInstance, jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit])(implicit executionContext: ExecutionContext) = { + private def submitJob(sparkClusterService: SparkClusterService, + offsetComparisonService: HyperdriveOffsetComparisonService, + jobInstance: JobInstance, + jobParameters: SparkInstanceParameters, + updateJob: JobInstance => Future[Unit] + )(implicit executionContext: ExecutionContext) = { for { newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters) - _ <- if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) - else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) + _ <- + if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob) + else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData)) } yield () } } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala index cb8ea967d..46c629af5 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutorTest.scala @@ -1,4 +1,3 @@ - /* * Copyright 2018 ABSA Group Limited * @@ -45,12 +44,17 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before val jobInstance = getJobInstance val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] - when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future{true}) - when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future {(): Unit}) - when(updateJobStub.apply(any[JobInstance])).thenReturn(Future {(): Unit}) + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { true }) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn - val resultFut = HyperdriveExecutor.execute(jobInstance, jobInstanceParameters, updateJobStub, sparkClusterServiceMock, offsetComparisonServiceMock) + val resultFut = HyperdriveExecutor.execute(jobInstance, + jobInstanceParameters, + updateJobStub, + sparkClusterServiceMock, + offsetComparisonServiceMock + ) resultFut.map { _ => verify(sparkClusterServiceMock).submitJob(any(), any(), any()) @@ -63,12 +67,17 @@ class HyperdriveExecutorTest extends AsyncFlatSpec with MockitoSugar with Before val jobInstance = getJobInstance val jobInstanceParameters = jobInstance.jobParameters.asInstanceOf[SparkInstanceParameters] - when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future{false}) - when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future {(): Unit}) - when(updateJobStub.apply(any[JobInstance])).thenReturn(Future {(): Unit}) + when(offsetComparisonServiceMock.isNewJobInstanceRequired(any())(any())).thenReturn(Future { false }) + when(sparkClusterServiceMock.submitJob(any(), any(), any())).thenReturn(Future { (): Unit }) + when(updateJobStub.apply(any[JobInstance])).thenReturn(Future { (): Unit }) implicit val sparkConfig: SparkConfig = DefaultTestSparkConfig().yarn - val resultFut = HyperdriveExecutor.execute(jobInstance, jobInstanceParameters, updateJobStub, sparkClusterServiceMock, offsetComparisonServiceMock) + val resultFut = HyperdriveExecutor.execute(jobInstance, + jobInstanceParameters, + updateJobStub, + sparkClusterServiceMock, + offsetComparisonServiceMock + ) resultFut.map { _ => verify(sparkClusterServiceMock, never()).submitJob(any(), any(), any()) From d4258658df8e50a29a167bc79fa3c6afe0c30919 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Thu, 21 Jul 2022 15:27:27 +0200 Subject: [PATCH 06/12] Add property for testing: executors.enable.hyperdrive.executor, default true Check for special job argument: useHyperdriveExecutor --- .../configuration/application/SchedulerConfig.scala | 4 +++- .../trigger/scheduler/executors/Executors.scala | 9 ++++++++- .../configuration/application/TestSchedulerConfig.scala | 2 +- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala index 8eeabea23..627613e49 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SchedulerConfig.scala @@ -57,5 +57,7 @@ class Sensors( class Executors( @NotNull @Name("thread.pool.size") - val threadPoolSize: Int + val threadPoolSize: Int, + @DefaultValue(Array("true")) + val enableHyperdriveExecutor: Boolean ) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 8cac177ea..75e8070aa 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -101,7 +101,8 @@ class Executors @Inject() ( jobInstance match { case Some(ji) => ji.jobParameters match { - case hyperdrive: SparkInstanceParameters if hyperdrive.jobType == JobTypes.Hyperdrive => + case hyperdrive: SparkInstanceParameters + if hyperdrive.jobType == JobTypes.Hyperdrive && useHyperExecutor(hyperdrive) => HyperdriveExecutor .execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService) case spark: SparkInstanceParameters => SparkExecutor.execute(ji, spark, updateJob, sparkClusterService) @@ -121,6 +122,12 @@ class Executors @Inject() ( fut } + private def useHyperExecutor(parameters: SparkInstanceParameters) = { + schedulerConfig.executors.enableHyperdriveExecutor && + parameters.jobType == JobTypes.Hyperdrive && + parameters.appArguments.contains("useHyperdriveExecutor") + } + private def updateJob(jobInstance: JobInstance): Future[Unit] = { logger.info( s"Job updated. ID = ${jobInstance.id} STATUS = ${jobInstance.jobStatus} EXECUTOR_ID = ${jobInstance.executorJobId}" diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala index 8873c22b9..10ce845a2 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/TestSchedulerConfig.scala @@ -33,6 +33,6 @@ object TestSchedulerConfig { autostart, lagThreshold, new Sensors(sensorsThreadPoolSize, sensorsChangedSensorsChunkQuerySize), - new Executors(executorsThreadPoolSize) + new Executors(executorsThreadPoolSize, true) ) } From 56262302064055ec4af264b366bea7b50c2e0bff Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Tue, 26 Jul 2022 14:16:34 +0200 Subject: [PATCH 07/12] Change job argument to useHyperdriveExecutor=true --- .../hyperdrive/trigger/scheduler/executors/Executors.scala | 2 +- .../scheduler/executors/spark/HyperdriveExecutor.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 75e8070aa..66b132033 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -125,7 +125,7 @@ class Executors @Inject() ( private def useHyperExecutor(parameters: SparkInstanceParameters) = { schedulerConfig.executors.enableHyperdriveExecutor && parameters.jobType == JobTypes.Hyperdrive && - parameters.appArguments.contains("useHyperdriveExecutor") + parameters.appArguments.contains("useHyperdriveExecutor=true") } private def updateJob(jobInstance: JobInstance): Future[Unit] = { diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala index d255c4018..0b936e9f9 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/HyperdriveExecutor.scala @@ -15,6 +15,7 @@ package za.co.absa.hyperdrive.trigger.scheduler.executors.spark +import org.slf4j.LoggerFactory import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses @@ -23,6 +24,8 @@ import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameter import scala.concurrent.{ExecutionContext, Future} object HyperdriveExecutor { + private val logger = LoggerFactory.getLogger(this.getClass) + def execute( jobInstance: JobInstance, jobParameters: SparkInstanceParameters, @@ -42,6 +45,7 @@ object HyperdriveExecutor { jobParameters: SparkInstanceParameters, updateJob: JobInstance => Future[Unit] )(implicit executionContext: ExecutionContext) = { + logger.debug("Using HyperdriveExecutor") for { newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters) _ <- From bfe1516a33144d26a221acc23e1a83f2ea21bb43 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 27 Jul 2022 14:36:39 +0200 Subject: [PATCH 08/12] Fix serialization of checkpoint offsets --- .../trigger/api/rest/services/CheckpointService.scala | 9 +++++---- .../services/HyperdriveOffsetComparisonService.scala | 7 ++++++- .../api/rest/services/CheckpointServiceTest.scala | 4 ++-- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala index 7b2742804..64ff8be0b 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala @@ -15,10 +15,10 @@ package za.co.absa.hyperdrive.trigger.api.rest.services -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.security.UserGroupInformation +import org.json4s.jackson.Serialization +import org.json4s.{Formats, NoTypeHints} import org.slf4j.LoggerFactory import org.springframework.stereotype.Service import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap @@ -44,9 +44,9 @@ class HdfsParameters( @Service class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends CheckpointService { private val logger = LoggerFactory.getLogger(this.getClass) - private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) private val offsetsDirName = "offsets" private val commitsDirName = "commits" + private implicit val formats: Formats = Serialization.formats(NoTypeHints) /** * See org.apache.spark.sql.execution.streaming.HDFSMetadataLog @@ -101,11 +101,12 @@ class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends Checkpo * and org.apache.spark.sql.kafka010.JsonUtils * for details on the assumed format */ + private def parseKafkaOffsetStream(lines: Iterator[String]): TopicPartitionOffsets = { val SERIALIZED_VOID_OFFSET = "-" def parseOffset(value: String): Option[TopicPartitionOffsets] = value match { case SERIALIZED_VOID_OFFSET => None - case json => Some(mapper.readValue(json, classOf[TopicPartitionOffsets])) + case json => Some(Serialization.read[TopicPartitionOffsets](json)) } if (!lines.hasNext) { throw new IllegalStateException("Incomplete log file") diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala index 8b7bed734..f03044056 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala @@ -93,7 +93,12 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig, getCheckpointOffsets(jobParameters, kafkaParametersOpt).map { case Some(checkpointOffsets) => val allConsumed = offsetsConsumed(checkpointOffsets, kafkaEndOffsets) - logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance") + if (allConsumed) { + logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance") + } else { + logger.debug(s"Some offsets haven't been consumed yet for topic ${kafkaParametersOpt.get._1}. Kafka offsets: ${kafkaEndOffsets}, " + + s"Checkpoint offsets: ${checkpointOffsets}") + } !allConsumed case _ => true } diff --git a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala index 8f4a49ba3..e81567158 100644 --- a/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala +++ b/src/test/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointServiceTest.scala @@ -72,9 +72,9 @@ class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter w result.size shouldBe 2 result.head._1 shouldBe "my.topic" - result.head._2 should contain theSameElementsAs Map("2" -> 2021, "1" -> 1021, "3" -> 3021, "0" -> 21) + result.head._2 should contain theSameElementsAs Map(2 -> 2021L, 1 -> 1021L, 3 -> 3021L, 0 -> 21L) result.toSeq(1)._1 shouldBe "my.other.topic" - result.toSeq(1)._2 should contain theSameElementsAs Map("0" -> 0) + result.toSeq(1)._2 should contain theSameElementsAs Map(0 -> 0L) } "getLatestOffsetFile" should "get the latest offset file, and it is committed" in { From 1561ec1ec37501fc1cea94cdf269b87f79ceb6bc Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 27 Jul 2022 14:39:08 +0200 Subject: [PATCH 09/12] Don't have unique consumer group id to avoid LRU Cache invalidations --- .../hyperdrive/trigger/api/rest/services/KafkaService.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala index 2a8f1fde4..b2f2ee3ef 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/KafkaService.scala @@ -36,6 +36,7 @@ trait KafkaService { @Service class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService { private val logger = LoggerFactory.getLogger(this.getClass) + private val consumerUuid = randomUUID().toString private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]]( generalConfig.kafkaConsumersCacheSize, createKafkaConsumer @@ -58,7 +59,7 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer } private def getOffsets(topic: String, properties: Properties, offsetFn: OffsetFunction): Map[Int, Long] = { - val groupId = s"hyperdrive-trigger-kafkaService-${randomUUID().toString}" + val groupId = s"hyperdrive-trigger-kafkaService-$consumerUuid" properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) val consumer = kafkaConsumersCache.get((properties, Thread.currentThread().getId)) val partitionInfo = Option(consumer.partitionsFor(topic)).map(_.asScala).getOrElse(Seq()) From cf42c5ae3267040821a276a2a416683af82c3df4 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 27 Jul 2022 17:30:53 +0200 Subject: [PATCH 10/12] Give DagInstanceStatus Skipped if all job instances are in status NoData --- .../trigger/scheduler/executors/Executors.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index 66b132033..cf5848cee 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -82,6 +82,18 @@ class Executors @Inject() ( case _ => } fut + case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && ji.jobStatus == JobStatuses.NoData) => + val updatedDagInstance = + dagInstance.copy(status = DagInstanceStatuses.Skipped, finished = Option(LocalDateTime.now())) + val fut = for { + _ <- dagInstanceRepository.update(updatedDagInstance) + } yield {} + fut.onComplete { + case Failure(exception) => + logger.error(s"Updating status failed for skipped run. Dag instance id = ${dagInstance.id}", exception) + case _ => + } + fut case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && !ji.jobStatus.isFailed) => val updatedDagInstance = dagInstance.copy(status = DagInstanceStatuses.Succeeded, finished = Option(LocalDateTime.now())) From 188c7c8f77e334ac8bf2a2445ccec6cab68abf6e Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 27 Jul 2022 17:40:54 +0200 Subject: [PATCH 11/12] Fix format --- .../rest/services/HyperdriveOffsetComparisonService.scala | 6 ++++-- .../hyperdrive/trigger/scheduler/executors/Executors.scala | 3 ++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala index f03044056..c46823718 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/HyperdriveOffsetComparisonService.scala @@ -96,8 +96,10 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig, if (allConsumed) { logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance") } else { - logger.debug(s"Some offsets haven't been consumed yet for topic ${kafkaParametersOpt.get._1}. Kafka offsets: ${kafkaEndOffsets}, " + - s"Checkpoint offsets: ${checkpointOffsets}") + logger.debug( + s"Some offsets haven't been consumed yet for topic ${kafkaParametersOpt.get._1}. Kafka offsets: ${kafkaEndOffsets}, " + + s"Checkpoint offsets: ${checkpointOffsets}" + ) } !allConsumed case _ => true diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala index cf5848cee..cecd49394 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/Executors.scala @@ -82,7 +82,8 @@ class Executors @Inject() ( case _ => } fut - case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && ji.jobStatus == JobStatuses.NoData) => + case jobInstances + if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && ji.jobStatus == JobStatuses.NoData) => val updatedDagInstance = dagInstance.copy(status = DagInstanceStatuses.Skipped, finished = Option(LocalDateTime.now())) val fut = for { From 8db28abdf4219e2d4d924144f8b63b1383802ee6 Mon Sep 17 00:00:00 2001 From: Kevin Wallimann Date: Wed, 3 Aug 2022 11:37:26 +0200 Subject: [PATCH 12/12] PR fixes --- README.md | 18 ++++++++++++++++++ .../api/rest/services/CheckpointService.scala | 1 - 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a62ade20e..169ec501f 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,24 @@ The Hadoop configuration directory needs to be added as the environment variable - To add the Hadoop configuration directory to the application classpath, in the file `/conf/catalina.properties`, append to the key `shared.loader` the hadoop conf dir, e.g. `shared.loader="/opt/hadoop"`. +### Symbolic links on user-defined files +With [Feature #700: Skip dag instance creation if no new message is available in Kafka](https://github.com/AbsaOSS/hyperdrive-trigger/issues/700), +the application needs to access files that are defined in job templates for Hyperdrive jobs. Especially, it will need +to access any files specified under `reader.option.kafka` to configure Kafka consumers, e.g. keystore, truststore +and keytabs under the same path as the Spark job would see them. + +For example, a (resolved) job template may include +- Additional files: `/etc/config/keystore.jks#keystore.jks` +- App arguments: `reader.option.kafka.ssl.keystore.location=keystore.jks` + +In this case, obviously `/etc/config/keystore.jks` needs to exist to submit the job, but additionally, +`/keystore.jks` needs to exist such that the web application can access the file under the same path +as the Spark job would, in order to be able to create a Kafka consumer using the same configuration as the Spark job. This +may obviously be achieved using symbolic links. + +For access to HDFS, `spark.yarn.keytab` and `spark.yarn.principal` from the application properties are used for authentication. +No symbolic links are required. + ## Embedded Tomcat For development purposes, hyperdrive-trigger can be executed as an application with an embedded tomcat. Please check out branch **feature/embedded-tomcat-2** to use it. diff --git a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala index 64ff8be0b..1a250c79d 100644 --- a/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala +++ b/src/main/scala/za/co/absa/hyperdrive/trigger/api/rest/services/CheckpointService.scala @@ -101,7 +101,6 @@ class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends Checkpo * and org.apache.spark.sql.kafka010.JsonUtils * for details on the assumed format */ - private def parseKafkaOffsetStream(lines: Iterator[String]): TopicPartitionOffsets = { val SERIALIZED_VOID_OFFSET = "-" def parseOffset(value: String): Option[TopicPartitionOffsets] = value match {