Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a52ea07
Controller
jozefbakus Feb 17, 2023
bca2ff4
Service + Model
jozefbakus Feb 17, 2023
ccabdfd
Model
jozefbakus Feb 17, 2023
2b57086
KafkaService
jozefbakus Feb 17, 2023
520e6d7
CheckpointService
jozefbakus Feb 17, 2023
63a56fd
HyperdriveOffsetService
jozefbakus Feb 17, 2023
c4ab308
HyperdriveService
jozefbakus Feb 17, 2023
d1fa3af
Refactoring
jozefbakus Feb 17, 2023
85c0380
Backend refactoring
jozefbakus Feb 21, 2023
5cba289
Front end
jozefbakus Feb 21, 2023
4e9b246
Lint
jozefbakus Feb 21, 2023
6c6f3b6
Lint
jozefbakus Feb 21, 2023
c7fb250
Removed unused code
jozefbakus Feb 22, 2023
5afa29f
CheckpointService + tests
jozefbakus Feb 22, 2023
71eaa22
Empty line
jozefbakus Feb 22, 2023
c2d99a7
KafkaService + tests
jozefbakus Feb 22, 2023
01093e0
HyperdriveOffsetService + tests
jozefbakus Feb 22, 2023
449996a
HyperdriveOffsetService + tests
jozefbakus Feb 22, 2023
0891835
HyperdriveOffsetService + tests
jozefbakus Feb 22, 2023
a3a3d15
HyperdriveService + tests
jozefbakus Feb 23, 2023
bfcd428
UI tests
jozefbakus Feb 23, 2023
aac15c7
Formatting
jozefbakus Feb 23, 2023
54d2150
PR fixes
jozefbakus Feb 24, 2023
732738b
PR fixes
jozefbakus Feb 24, 2023
cd92e9c
PR fixes
jozefbakus Feb 24, 2023
b612fd4
PR fixes
jozefbakus Feb 24, 2023
b6bef1c
PR fixes
jozefbakus Feb 24, 2023
4fd8e76
Merge with Develop
jozefbakus Feb 24, 2023
7a4aa2e
Formatting
jozefbakus Feb 24, 2023
d765916
PR Fixes
jozefbakus Mar 9, 2023
8669327
PR Fixes
jozefbakus Mar 9, 2023
bf47a0e
PR Fixes
jozefbakus Mar 9, 2023
806a29c
PR Fixes
jozefbakus Mar 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.api.rest.controllers

import org.springframework.web.bind.annotation._
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveService
import za.co.absa.hyperdrive.trigger.models._

import java.util.concurrent.CompletableFuture
import javax.inject.Inject
import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext.Implicits.global

@RestController
class HyperdriveController @Inject() (hyperdriveService: HyperdriveService) {
@GetMapping(path = Array("/hyperdrive/workflows/{id}/ingestionStatus"))
def getIngestionStatus(@PathVariable id: Long): CompletableFuture[Seq[IngestionStatus]] =
hyperdriveService.getIngestionStatus(id).toJava.toCompletableFuture
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap

import javax.inject.Inject
import scala.util.Try
import scala.util.{Success, Try}

trait CheckpointService {
type TopicPartitionOffsets = Map[String, Map[Int, Long]]
Expand All @@ -34,6 +34,10 @@ trait CheckpointService {
def getLatestOffsetFilePath(params: HdfsParameters)(
implicit ugi: UserGroupInformation
): Try[Option[(String, Boolean)]]

def getLatestCommittedOffset(params: HdfsParameters)(
implicit ugi: UserGroupInformation
): Try[Option[TopicPartitionOffsets]]
}

class HdfsParameters(
Expand Down Expand Up @@ -98,6 +102,16 @@ class CheckpointServiceImpl @Inject() (@Lazy hdfsService: HdfsService) extends C
}
}

override def getLatestCommittedOffset(
params: HdfsParameters
)(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]] = {
getLatestCommitBatchId(params.checkpointLocation).flatMap {
_.map { latestCommit =>
getOffsetsFromFile(new Path(s"${params.checkpointLocation}/$offsetsDirName/$latestCommit").toString)
}.getOrElse(Success(None))
}
}

/**
* see org.apache.spark.sql.execution.streaming.OffsetSeqLog
* and org.apache.spark.sql.kafka010.JsonUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,28 @@ import org.springframework.context.annotation.Lazy
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
import za.co.absa.hyperdrive.trigger.models.{JobInstanceParameters, SparkInstanceParameters}
import za.co.absa.hyperdrive.trigger.models.{BeginningEndOffsets, JobInstanceParameters, SparkInstanceParameters}

import java.util.Properties
import javax.inject.Inject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

trait HyperdriveOffsetComparisonService {
trait HyperdriveOffsetService {
def isNewJobInstanceRequired(jobParameters: JobInstanceParameters)(implicit ec: ExecutionContext): Future[Boolean]

def getNumberOfMessagesLeft(jobParameters: JobInstanceParameters)(
implicit ec: ExecutionContext
): Future[Option[(String, Map[Int, Long])]]
}

@Service
@Lazy
class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
@Lazy checkpointService: CheckpointService,
@Lazy userGroupInformationService: UserGroupInformationService,
kafkaService: KafkaService
) extends HyperdriveOffsetComparisonService {
class HyperdriveOffsetServiceImpl @Inject() (sparkConfig: SparkConfig,
@Lazy checkpointService: CheckpointService,
@Lazy userGroupInformationService: UserGroupInformationService,
kafkaService: KafkaService
) extends HyperdriveOffsetService {
private val logger = LoggerFactory.getLogger(this.getClass)
private val HyperdriveCheckpointKey = "writer.common.checkpoint.location"
private val HyperdriveKafkaTopicKey = "reader.kafka.topic"
Expand All @@ -52,6 +57,58 @@ class HyperdriveOffsetComparisonServiceImpl @Inject() (sparkConfig: SparkConfig,
private val ListDelimiter = ','
private val defaultDeserializer = "org.apache.kafka.common.serialization.StringDeserializer"

/**
* @param jobParameters Parameters for the job instance. Should contain at least
* - reader.kafka.topic
* - reader.kafka.brokers
* - writer.common.checkpoint.location
* @param ec ExecutionContext
* @return - number of not ingested messages for each topic and partition.
*/
def getNumberOfMessagesLeft(
jobParameters: JobInstanceParameters
)(implicit ec: ExecutionContext): Future[Option[(String, Map[Int, Long])]] = {
val kafkaParametersOpt = getKafkaParameters(jobParameters)
val hdfsParametersOpt: Option[HdfsParameters] = getResolvedAppArguments(jobParameters).flatMap(getHdfsParameters)

if (kafkaParametersOpt.isEmpty) {
logger.debug(s"Kafka parameters were not found in job definition $jobParameters")
}

Future(
for {
kafkaParameters <- kafkaParametersOpt
hdfsParameters <- hdfsParametersOpt
} yield {
val kafkaOffsets = kafkaService.getBeginningEndOffsets(kafkaParameters._1, kafkaParameters._2)
kafkaOffsets match {
case BeginningEndOffsets(_, start, end) if start.nonEmpty && end.nonEmpty && start.keySet == end.keySet =>
val ugi = userGroupInformationService.loginUserFromKeytab(hdfsParameters.principal, hdfsParameters.keytab)
val hdfsOffsetsTry = checkpointService.getLatestCommittedOffset(hdfsParameters)(ugi).map(_.map(_.head._2))

hdfsOffsetsTry match {
case Failure(_) => None
case Success(hdfsOffsetsOption) =>
val messagesLeft = kafkaOffsets.beginningOffsets.map { case (partition, kafkaBeginningOffset) =>
val kafkaEndOffset = kafkaOffsets.endOffsets(partition)
val numberOfMessages = hdfsOffsetsOption.flatMap(_.get(partition)) match {
case Some(hdfsOffset) if hdfsOffset > kafkaEndOffset => kafkaEndOffset - hdfsOffset
case Some(hdfsOffset) if hdfsOffset > kafkaBeginningOffset => kafkaEndOffset - hdfsOffset
case Some(hdfsOffset) if hdfsOffset <= kafkaBeginningOffset => kafkaEndOffset - kafkaBeginningOffset
case None => kafkaEndOffset - kafkaBeginningOffset
}
partition -> numberOfMessages
}
Some((kafkaOffsets.topic, messagesLeft))
}
case _ =>
logger.warn(s"Inconsistent response from kafka for topic: ${kafkaOffsets.topic}")
None
}
}
).map(_.flatten)
}

/**
* @param jobParameters Parameters for the job instance. Should contain at least
* - reader.kafka.topic
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.api.rest.services

import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.models.{IngestionStatus, TopicStatus}
import za.co.absa.hyperdrive.trigger.models.enums.JobTypes
import za.co.absa.hyperdrive.trigger.persistance.WorkflowRepository

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

trait HyperdriveService {
protected val workflowRepository: WorkflowRepository
protected val jobTemplateService: JobTemplateService
protected val hyperdriveOffsetService: HyperdriveOffsetService

def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]]
}

@Service
class HyperdriveServiceImpl(
override protected val workflowRepository: WorkflowRepository,
override protected val jobTemplateService: JobTemplateService,
override protected val hyperdriveOffsetService: HyperdriveOffsetService
) extends HyperdriveService {
private val logger = LoggerFactory.getLogger(this.getClass)

override def getIngestionStatus(id: Long)(implicit ec: ExecutionContext): Future[Seq[IngestionStatus]] = {
workflowRepository.getWorkflow(id).flatMap { workflow =>
jobTemplateService
.resolveJobTemplate(workflow.dagDefinitionJoined)
.flatMap(resolvedJobs =>
Future.sequence(
resolvedJobs.map {
case resolvedJob if resolvedJob.jobParameters.jobType == JobTypes.Hyperdrive =>
hyperdriveOffsetService.getNumberOfMessagesLeft(resolvedJob.jobParameters).transformWith {
case Failure(exception) =>
logger.error(s"Failed to get number of messages left to ingest for a workflow: $id", exception)
Future.successful(
IngestionStatus(
jobName = resolvedJob.name,
jobType = resolvedJob.jobParameters.jobType.name,
topicStatus = None
)
)
case Success(messagesLeftOpt) =>
Future.successful(
IngestionStatus(
jobName = resolvedJob.name,
jobType = resolvedJob.jobParameters.jobType.name,
topicStatus = messagesLeftOpt.map(messagesLeft => TopicStatus(messagesLeft._1, messagesLeft._2))
)
)
}
case resolvedJob =>
Future.successful(
IngestionStatus(
jobName = resolvedJob.name,
jobType = resolvedJob.jobParameters.jobType.name,
topicStatus = None
)
)
}
)
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.springframework.stereotype.Service
import org.springframework.util.ConcurrentLruCache
import za.co.absa.hyperdrive.trigger.api.rest.services.KafkaServiceImpl.{BeginningOffsets, EndOffsets, OffsetFunction}
import za.co.absa.hyperdrive.trigger.configuration.application.GeneralConfig
import za.co.absa.hyperdrive.trigger.models.BeginningEndOffsets

import java.util.Properties
import java.util.UUID.randomUUID
Expand All @@ -31,6 +32,7 @@ import scala.collection.JavaConverters._
trait KafkaService {
def getBeginningOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
def getEndOffsets(topic: String, consumerProperties: Properties): Map[Int, Long]
def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets
}

@Service
Expand All @@ -50,6 +52,14 @@ class KafkaServiceImpl @Inject() (generalConfig: GeneralConfig) extends KafkaSer
getOffsets(topic, consumerProperties, EndOffsets)
}

def getBeginningEndOffsets(topic: String, consumerProperties: Properties): BeginningEndOffsets = {
BeginningEndOffsets(
topic,
getOffsets(topic, consumerProperties, BeginningOffsets),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a suggestion for future projects because it would require reworking or unification of the entire architecture. I noticed that some services use Futures as output, and some don't even when they are blocking.

case class Offset(beginning: Long, end: Long)

def getBeginningEndOffset(topic: String, consumerProperties: Properties): Future[Map[Long, Offset]] =
  for {
    kafkaConsumer   <- consumerPool.getConsumer(consumerProperties)
    parts           <- listTopicPartitions(kafkaConsumer, topic)
    futureBeginning  = getBeginningOffsets(kafka, parts)
    futureEnd        = getEndOffsets(kafka, parts)
    begin           <- futureBegin
    end             <- futureEnd
  } yield begin.map { case (part, bOff) => part -> Offset(bOff, end(part)) }  // This might be done in safer manner

This might improve performance.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. Please create an issue for this one. With this PR I just follow style that was already introduced

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do!

getOffsets(topic, consumerProperties, EndOffsets)
)
}

def createKafkaConsumer(propertiesThreadId: (Properties, Long)): KafkaConsumer[String, String] = {
logger.info(
s"Creating new Kafka Consumer for thread id ${propertiesThreadId._2} and" +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.models

case class BeginningEndOffsets(
topic: String,
beginningOffsets: Map[Int, Long],
endOffsets: Map[Int, Long]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.models

case class IngestionStatus(
jobName: String,
jobType: String,
topicStatus: Option[TopicStatus]
)

case class TopicStatus(topic: String, messagesToIngest: Map[Int, Long])
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.springframework.beans.factory.BeanFactory
import org.springframework.context.annotation.Lazy
import za.co.absa.hyperdrive.trigger.scheduler.executors.shell.ShellExecutor
import org.springframework.stereotype.Component
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService
import za.co.absa.hyperdrive.trigger.configuration.application.{SchedulerConfig, SparkConfig}
import za.co.absa.hyperdrive.trigger.scheduler.notifications.NotificationSender

Expand All @@ -49,7 +49,7 @@ class Executors @Inject() (
beanFactory: BeanFactory,
implicit val sparkConfig: SparkConfig,
schedulerConfig: SchedulerConfig,
@Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetComparisonService
@Lazy hyperdriveOffsetComparisonService: HyperdriveOffsetService
) {
private val logger = LoggerFactory.getLogger(this.getClass)
private implicit val executionContext: ExecutionContextExecutor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package za.co.absa.hyperdrive.trigger.scheduler.executors.spark

import org.slf4j.LoggerFactory
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetComparisonService
import za.co.absa.hyperdrive.trigger.api.rest.services.HyperdriveOffsetService
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses
import za.co.absa.hyperdrive.trigger.models.{JobInstance, SparkInstanceParameters}
Expand All @@ -31,23 +31,23 @@ object HyperdriveExecutor {
jobParameters: SparkInstanceParameters,
updateJob: JobInstance => Future[Unit],
sparkClusterService: SparkClusterService,
offsetComparisonService: HyperdriveOffsetComparisonService
offsetService: HyperdriveOffsetService
)(implicit executionContext: ExecutionContext, sparkConfig: SparkConfig): Future[Unit] =
jobInstance.executorJobId match {
case None => submitJob(sparkClusterService, offsetComparisonService, jobInstance, jobParameters, updateJob)
case None => submitJob(sparkClusterService, offsetService, jobInstance, jobParameters, updateJob)
case Some(executorJobId) =>
SparkExecutor.updateJobStatus(executorJobId, jobInstance, updateJob, sparkClusterService)
}

private def submitJob(sparkClusterService: SparkClusterService,
offsetComparisonService: HyperdriveOffsetComparisonService,
offsetService: HyperdriveOffsetService,
jobInstance: JobInstance,
jobParameters: SparkInstanceParameters,
updateJob: JobInstance => Future[Unit]
)(implicit executionContext: ExecutionContext) = {
logger.debug("Using HyperdriveExecutor")
for {
newJobRequired <- offsetComparisonService.isNewJobInstanceRequired(jobParameters)
newJobRequired <- offsetService.isNewJobInstanceRequired(jobParameters)
_ <-
if (newJobRequired) sparkClusterService.submitJob(jobInstance, jobParameters, updateJob)
else updateJob(jobInstance.copy(jobStatus = JobStatuses.NoData))
Expand Down
Loading