Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ The Hadoop configuration directory needs to be added as the environment variable
- To add the Hadoop configuration directory to the application classpath,
in the file `<tomcat-base>/conf/catalina.properties`, append to the key `shared.loader` the hadoop conf dir, e.g. `shared.loader="/opt/hadoop"`.

### Symbolic links on user-defined files
With [Feature #700: Skip dag instance creation if no new message is available in Kafka](https://github.com/AbsaOSS/hyperdrive-trigger/issues/700),
the application needs to access files that are defined in job templates for Hyperdrive jobs. Especially, it will need
to access any files specified under `reader.option.kafka` to configure Kafka consumers, e.g. keystore, truststore
and keytabs under the same path as the Spark job would see them.

For example, a (resolved) job template may include
- Additional files: `/etc/config/keystore.jks#keystore.jks`
- App arguments: `reader.option.kafka.ssl.keystore.location=keystore.jks`

In this case, obviously `/etc/config/keystore.jks` needs to exist to submit the job, but additionally,
`<tomcat-root-directory>/keystore.jks` needs to exist such that the web application can access the file under the same path
as the Spark job would, in order to be able to create a Kafka consumer using the same configuration as the Spark job. This
may obviously be achieved using symbolic links.

For access to HDFS, `spark.yarn.keytab` and `spark.yarn.principal` from the application properties are used for authentication.
No symbolic links are required.

## Embedded Tomcat

For development purposes, hyperdrive-trigger can be executed as an application with an embedded tomcat. Please check out branch **feature/embedded-tomcat-2** to use it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.security.UserGroupInformation
import org.json4s.jackson.Serialization
import org.json4s.{Formats, NoTypeHints}
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap
Expand All @@ -44,9 +44,9 @@ class HdfsParameters(
@Service
class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends CheckpointService {
private val logger = LoggerFactory.getLogger(this.getClass)
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val offsetsDirName = "offsets"
private val commitsDirName = "commits"
private implicit val formats: Formats = Serialization.formats(NoTypeHints)

/**
* See org.apache.spark.sql.execution.streaming.HDFSMetadataLog
Expand Down Expand Up @@ -105,7 +105,7 @@ class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends Checkpo
val SERIALIZED_VOID_OFFSET = "-"
def parseOffset(value: String): Option[TopicPartitionOffsets] = value match {
case SERIALIZED_VOID_OFFSET => None
case json => Some(mapper.readValue(json, classOf[TopicPartitionOffsets]))
case json => Some(Serialization.read[TopicPartitionOffsets](json))
}
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
getCheckpointOffsets(jobParameters, kafkaParametersOpt).map {
case Some(checkpointOffsets) =>
val allConsumed = offsetsConsumed(checkpointOffsets, kafkaEndOffsets)
logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance")
if (allConsumed) {
logger.info(s"All offsets consumed for topic ${kafkaParametersOpt.get._1}. Skipping job instance")
} else {
logger.debug(
s"Some offsets haven't been consumed yet for topic ${kafkaParametersOpt.get._1}. Kafka offsets: ${kafkaEndOffsets}, " +
s"Checkpoint offsets: ${checkpointOffsets}"
)
}
!allConsumed
case _ => true
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.configuration.application.JobDefinitionConfig.{SparkExtraJavaOptions, SparkTags}
import za.co.absa.hyperdrive.trigger.models._
import za.co.absa.hyperdrive.trigger.api.rest.utils.Extensions.{SparkConfigList, SparkConfigMap}
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes.JobType

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -99,6 +101,7 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService {
templateParams: SparkTemplateParameters
): SparkInstanceParameters =
SparkInstanceParameters(
jobType = mergeSparkJobType(definitionParams.jobType, templateParams.jobType),
jobJar = mergeOptionString(definitionParams.jobJar, templateParams.jobJar),
mainClass = mergeOptionString(definitionParams.mainClass, templateParams.mainClass),
appArguments = mergeLists(definitionParams.appArguments, templateParams.appArguments),
Expand All @@ -111,6 +114,14 @@ class JobTemplateResolutionServiceImpl extends JobTemplateResolutionService {
).toAdditionalSparkConfigList
)

private def mergeSparkJobType(definitionJobType: JobType, templateJobType: JobType) = {
if (definitionJobType == templateJobType) {
definitionJobType
} else {
JobTypes.Spark
}
}

private def mergeShellParameters(
definitionParams: ShellDefinitionParameters,
templateParams: ShellTemplateParameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ trait KafkaService {
@Service
class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaService {
private val logger = LoggerFactory.getLogger(this.getClass)
private val consumerUuid = randomUUID().toString
private val kafkaConsumersCache = new ConcurrentLruCache[(Properties, Long), KafkaConsumer[String, String]](
generalConfig.kafkaConsumersCacheSize,
createKafkaConsumer
Expand All @@ -58,7 +59,7 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer
}

private def getOffsets(topic: String, properties: Properties, offsetFn: OffsetFunction): Map[Int, Long] = {
val groupId = s"hyperdrive-trigger-kafkaService-${randomUUID().toString}"
val groupId = s"hyperdrive-trigger-kafkaService-$consumerUuid"
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
val consumer = kafkaConsumersCache.get((properties, Thread.currentThread().getId))
val partitionInfo = Option(consumer.partitionsFor(topic)).map(_.asScala).getOrElse(Seq())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,7 @@ class Sensors(
class Executors(
@NotNull
@Name("thread.pool.size")
val threadPoolSize: Int
val threadPoolSize: Int,
@DefaultValue(Array("true"))
val enableHyperdriveExecutor: Boolean
)
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ object JobStatuses {
case object InvalidExecutor extends JobStatus("InvalidExecutor", true, true, false)
case object FailedPreviousJob extends JobStatus("FailedPreviousJob", true, true, false)
case object Skipped extends JobStatus("Skipped", true, false, false)
case object NoData extends JobStatus("No Data", true, false, false)

val statuses: Set[JobStatus] = Set(
InQueue,
Expand All @@ -49,7 +50,8 @@ object JobStatuses {
SubmissionTimeout,
InvalidExecutor,
FailedPreviousJob,
Skipped
Skipped,
NoData
)
val finalStatuses: Set[JobStatus] = statuses.filter(!_.isFinalStatus)
val nonFinalStatuses: Set[JobStatus] = statuses.filter(_.isFinalStatus)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import java.util.concurrent
import javax.inject.Inject
import za.co.absa.hyperdrive.trigger.models.{DagInstance, JobInstance, ShellInstanceParameters, SparkInstanceParameters}
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.InvalidExecutor
import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses}
import za.co.absa.hyperdrive.trigger.models.enums.{DagInstanceStatuses, JobStatuses, JobTypes}
import za.co.absa.hyperdrive.trigger.persistance.{DagInstanceRepository, JobInstanceRepository}
import za.co.absa.hyperdrive.trigger.scheduler.executors.spark.{
HyperdriveExecutor,
SparkClusterService,
SparkEmrClusterServiceImpl,
SparkExecutor,
Expand All @@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory
import org.springframework.beans.factory.BeanFactory
import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor
import org.springframework.stereotype.Component
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig}
import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender

Expand All @@ -45,7 +47,8 @@ class Executors @Inject() (
notificationSender: NotificationSender,
beanFactory: BeanFactory,
implicit val sparkConfig: SparkConfig,
schedulerConfig: SchedulerConfig
schedulerConfig: SchedulerConfig,
hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService
) {
private val logger = LoggerFactory.getLogger(this.getClass)
private implicit val executionContext: ExecutionContextExecutor =
Expand Down Expand Up @@ -79,6 +82,19 @@ class Executors @Inject() (
case _ =>
}
fut
case jobInstances
if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && ji.jobStatus == JobStatuses.NoData) =>
val updatedDagInstance =
dagInstance.copy(status = DagInstanceStatuses.Skipped, finished = Option(LocalDateTime.now()))
val fut = for {
_ <- dagInstanceRepository.update(updatedDagInstance)
} yield {}
fut.onComplete {
case Failure(exception) =>
logger.error(s"Updating status failed for skipped run. Dag instance id = ${dagInstance.id}", exception)
case _ =>
}
fut
case jobInstances if jobInstances.forall(ji => ji.jobStatus.isFinalStatus && !ji.jobStatus.isFailed) =>
val updatedDagInstance =
dagInstance.copy(status = DagInstanceStatuses.Succeeded, finished = Option(LocalDateTime.now()))
Expand All @@ -98,6 +114,10 @@ class Executors @Inject() (
jobInstance match {
case Some(ji) =>
ji.jobParameters match {
case hyperdrive: SparkInstanceParameters
if hyperdrive.jobType == JobTypes.Hyperdrive && useHyperExecutor(hyperdrive) =>
HyperdriveExecutor
.execute(ji, hyperdrive, updateJob, sparkClusterService, hyperdriveOffsetComparisonService)
case spark: SparkInstanceParameters => SparkExecutor.execute(ji, spark, updateJob, sparkClusterService)
case shell: ShellInstanceParameters => ShellExecutor.execute(ji, shell, updateJob)
case _ => updateJob(ji.copy(jobStatus = InvalidExecutor))
Expand All @@ -115,6 +135,12 @@ class Executors @Inject() (
fut
}

private def useHyperExecutor(parameters: SparkInstanceParameters) = {
schedulerConfig.executors.enableHyperdriveExecutor &&
parameters.jobType == JobTypes.Hyperdrive &&
parameters.appArguments.contains("useHyperdriveExecutor=true")
}

private def updateJob(jobInstance: JobInstance): Future[Unit] = {
logger.info(
s"Job updated. ID = ${jobInstance.id} STATUS = ${jobInstance.jobStatus} EXECUTOR_ID = ${jobInstance.executorJobId}"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.scheduler.executors.spark

import org.slf4j.LoggerFactory
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}

import scala.concurrent.{ExecutionContext, Future}

object HyperdriveExecutor {
private val logger = LoggerFactory.getLogger(this.getClass)

def execute(
jobInstance: JobInstance,
jobParameters: SparkInstanceParameters,
updateJob: JobInstance => Future[Unit],
sparkClusterService: SparkClusterService,
offsetComparisonService: HyperdriveOffsetComparisonService
)(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] =
jobInstance.executorJobId match {
case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob)
case Some(executorJobId) =>
SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService)
}

private def submitJob(sparkClusterService: SparkClusterService,
offsetComparisonService: HyperdriveOffsetComparisonService,
jobInstance: JobInstance,
jobParameters: SparkInstanceParameters,
updateJob: JobInstance => Future[Unit]
)(implicit executionContext: ExecutionContext) = {
logger.debug("Using HyperdriveExecutor")
for {
newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters)
_ <-
if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob)
else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData))
} yield ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object SparkExecutor {
case Some(executorJobId) => updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService)
}

private def updateJobStatus(
private[spark] def updateJobStatus(
executorJobId: String,
jobInstance: JobInstance,
updateJob: JobInstance => Future[Unit],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class CheckpointServiceTest extends FlatSpec with Matchers with BeforeAndAfter w

result.size shouldBe 2
result.head._1 shouldBe "my.topic"
result.head._2 should contain theSameElementsAs Map("2" -> 2021, "1" -> 1021, "3" -> 3021, "0" -> 21)
result.head._2 should contain theSameElementsAs Map(2 -> 2021L, 1 -> 1021L, 3 -> 3021L, 0 -> 21L)
result.toSeq(1)._1 shouldBe "my.other.topic"
result.toSeq(1)._2 should contain theSameElementsAs Map("0" -> 0)
result.toSeq(1)._2 should contain theSameElementsAs Map(0 -> 0L)
}

"getLatestOffsetFile" should "get the latest offset file, and it is committed" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers {
)

// then
templateDefined.head.jobParameters.jobType shouldBe JobTypes.Spark
templateDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive
templateDefined.head.jobParameters
.asInstanceOf[SparkInstanceParameters]
.jobJar shouldBe sparkTemplateParametersDefined.jobJar
Expand All @@ -302,7 +302,7 @@ class JobTemplateResolutionServiceTest extends FlatSpec with Matchers {
.asInstanceOf[SparkInstanceParameters]
.additionalSparkConfig should contain theSameElementsAs sparkTemplateParametersDefined.additionalSparkConfig

bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Spark
bothScriptsDefined.head.jobParameters.jobType shouldBe JobTypes.Hyperdrive
bothScriptsDefined.head.jobParameters
.asInstanceOf[SparkInstanceParameters]
.jobJar shouldBe sparkTemplateParametersDefined.jobJar
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ object TestSchedulerConfig {
autostart,
lagThreshold,
new Sensors(sensorsThreadPoolSize, sensorsChangedSensorsChunkQuerySize),
new Executors(executorsThreadPoolSize)
new Executors(executorsThreadPoolSize, true)
)
}
Loading