Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
041d34e
Implement KafkaService
kevinwallimann Jun 16, 2022
8e175c9
Implement KafkaService
kevinwallimann Jun 16, 2022
8166509
formatting
kevinwallimann Jun 16, 2022
3e2d88c
wip
kevinwallimann Jun 20, 2022
55f02f2
wip
kevinwallimann Jun 22, 2022
94c0731
rename to HyperdriveOffsetComparisonService, add tests
kevinwallimann Jun 22, 2022
3a4e8d7
fix tests
kevinwallimann Jun 22, 2022
845f6f5
fix formatting
kevinwallimann Jun 22, 2022
e2e7d4d
make methods private, remove unnecessary stuff
kevinwallimann Jun 23, 2022
7661d2a
Don't import hyperdrive -ingestor
kevinwallimann Jun 24, 2022
12215d2
Merge branch 'develop' into feature/687-hdfs-service
kevinwallimann Jun 28, 2022
69b0403
Exclude conflicting dependeny
kevinwallimann Jun 28, 2022
9ce0843
Fix format
kevinwallimann Jun 28, 2022
d9ff9e2
PR fixes
kevinwallimann Jun 30, 2022
bc59081
Use futures, todo: return false if kafka topic doesn't exist
kevinwallimann Jul 1, 2022
a7cf78f
return false if kafka topic doesn't exist. Add tests
kevinwallimann Jul 13, 2022
7955475
scalafmt
kevinwallimann Jul 13, 2022
eb57234
scalafmt
kevinwallimann Jul 13, 2022
da0177e
Merge branch 'develop' into feature/687-hdfs-service
kevinwallimann Jul 13, 2022
15299f0
Remove temp file
kevinwallimann Jul 13, 2022
cddc675
login explicitly
kevinwallimann Jul 13, 2022
541f8d1
Change parameter type to JobInstanceParameters
kevinwallimann Jul 14, 2022
f2e7f45
PR fix: Limit number of kafka consumers
kevinwallimann Jul 15, 2022
97d8b89
PR fix: Refactor HdfsService -> CheckpointService
kevinwallimann Jul 15, 2022
6770f85
PR fix: Make methods private
kevinwallimann Jul 18, 2022
013e825
Add comments / logging
kevinwallimann Jul 18, 2022
70a8013
Fix formatting
kevinwallimann Jul 18, 2022
c7f8309
Make kafka consumer cache per thread
kevinwallimann Jul 18, 2022
cd4c7f4
Add a default deserializer to read from kafka
kevinwallimann Jul 18, 2022
99769c6
fix formatting
kevinwallimann Jul 18, 2022
291e832
fix formatting 2
kevinwallimann Jul 18, 2022
c62bdaa
Add HdfsService, additional logging for kafka consumer
kevinwallimann Jul 20, 2022
b9c89e5
Merge branch 'develop' into feature/687-hdfs-service
kevinwallimann Jul 21, 2022
b04ac94
PR fix: Move parse method to HdfsService, add tests
kevinwallimann Jul 21, 2022
61ab80b
Undo change for testing
kevinwallimann Jul 21, 2022
08fea77
Add comment
kevinwallimann Jul 21, 2022
8fb2aad
Fix formatting
kevinwallimann Jul 21, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ spring.mail.host=
spring.mail.port=
```
```
# Kafka Service properties. Used for per-workflow Kafka consumers
kafka.consumers.cache.size=50
```
```
#Kafka sensor properties. Not all are required. Adjust according to your use case.
kafkaSource.group.id.prefix=hyper_drive_${appUniqueId}
kafkaSource.poll.duration=500
Expand Down
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
<postgresql>42.3.3</postgresql>
<kafka.version>2.2.0</kafka.version>
<embedded.kafka.version>2.2.0</embedded.kafka.version>
<commons.configuration2.version>2.7</commons.configuration2.version> <!-- Same as in Hyperdrive -->
<play-json.version>2.7.3</play-json.version>
<play-ws-standalone.version>2.0.3</play-ws-standalone.version>
<play-ahc-ws-standalone.version>2.0.2</play-ahc-ws-standalone.version>
Expand Down Expand Up @@ -108,6 +109,7 @@
<commons-validator.version>1.7</commons-validator.version>
<hibernate.validator.version>6.2.0.Final</hibernate.validator.version>
<aws.java.sdk.bom.version>1.12.29</aws.java.sdk.bom.version>
<absa.commons.version>1.0.3</absa.commons.version>

<!-- Deployment -->
<gpg.plugin.version>1.6</gpg.plugin.version>
Expand Down Expand Up @@ -184,6 +186,11 @@
<artifactId>kafka_${scala.compat.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
<version>${commons.configuration2.version}</version>
</dependency>

<!-- Dependencies for Spark InProcessLauncher -->
<dependency>
Expand Down Expand Up @@ -243,6 +250,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- End Dependencies for Spark InProcessLauncher -->
Expand Down Expand Up @@ -394,6 +405,12 @@
<version>${embedded.kafka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>za.co.absa.commons</groupId>
<artifactId>commons_${scala.compat.version}</artifactId>
<version>${absa.commons.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ notification.sender.address=
spring.mail.host=
spring.mail.port=

# Kafka service properties.
kafka.consumers.cache.size=50

#Kafka sensor properties.
kafkaSource.group.id.prefix=hyper_drive
kafkaSource.poll.duration=500
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.api.rest.services

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.security.UserGroupInformation
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service
import za.co.absa.hyperdrive.trigger.api.rest.utils.ScalaUtil.swap

import javax.inject.Inject
import scala.util.Try

trait CheckpointService {
type TopicPartitionOffsets = Map[String, Map[Int, Long]]

def getOffsetsFromFile(path: String)(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]]
def getLatestOffsetFilePath(params: HdfsParameters)(implicit
ugi: UserGroupInformation
): Try[Option[(String, Boolean)]]
}

class HdfsParameters(
val keytab: String,
val principal: String,
val checkpointLocation: String
)

@Service
class CheckpointServiceImpl @Inject() (hdfsService: HdfsService) extends CheckpointService {
private val logger = LoggerFactory.getLogger(this.getClass)
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val offsetsDirName = "offsets"
private val commitsDirName = "commits"

/**
* See org.apache.spark.sql.execution.streaming.HDFSMetadataLog
*/
private val batchFilesFilter = new PathFilter {
override def accept(path: Path): Boolean = {
try {
path.getName.toLong
true
} catch {
case _: NumberFormatException =>
false
}
}
}

override def getOffsetsFromFile(
path: String
)(implicit ugi: UserGroupInformation): Try[Option[TopicPartitionOffsets]] = {
hdfsService.parseFileAndClose(path, parseKafkaOffsetStream)
}

/**
* @return an Option of a String, Boolean pair. The string contains the path to the latest offset file, while the
* boolean is true if the offset is committed (i.e. a corresponding commit file exists), and false otherwise.
* None is returned if the offset file does not exist. If the offset file does not exist, the corresponding
* commit file is assumed to also not exist.
*/
override def getLatestOffsetFilePath(
params: HdfsParameters
)(implicit ugi: UserGroupInformation): Try[Option[(String, Boolean)]] = {
getLatestOffsetBatchId(params.checkpointLocation).flatMap { offsetBatchIdOpt =>
val offsetFilePath = offsetBatchIdOpt.map { offsetBatchId =>
getLatestCommitBatchId(params.checkpointLocation).map { commitBatchIdOpt =>
val committed = commitBatchIdOpt match {
case Some(commitBatchId) => offsetBatchId == commitBatchId
case None => false
}
val path = new Path(s"${params.checkpointLocation}/${offsetsDirName}/${offsetBatchId}")
(path.toString, committed)
}
}
if (offsetFilePath.isEmpty) {
logger.debug(s"No offset files exist under checkpoint location ${params.checkpointLocation}")
}
swap(offsetFilePath)
}
}

/**
* see org.apache.spark.sql.execution.streaming.OffsetSeqLog
* and org.apache.spark.sql.kafka010.JsonUtils
* for details on the assumed format
*/
private def parseKafkaOffsetStream(lines: Iterator[String]): TopicPartitionOffsets = {
val SERIALIZED_VOID_OFFSET = "-"
def parseOffset(value: String): Option[TopicPartitionOffsets] = value match {
case SERIALIZED_VOID_OFFSET => None
case json => Some(mapper.readValue(json, classOf[TopicPartitionOffsets]))
}
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file")
}

lines.next() // skip version
lines.next() // skip metadata
lines
.map(parseOffset)
.filter(_.isDefined)
.map(_.get)
.toSeq
.head
}

private def getLatestCommitBatchId(checkpointDir: String)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
val commitsDir = new Path(s"$checkpointDir/$commitsDirName")
getLatestBatchId(commitsDir)
}

private def getLatestOffsetBatchId(checkpointDir: String)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
val offsetsDir = new Path(s"$checkpointDir/$offsetsDirName")
getLatestBatchId(offsetsDir)
}

private def getLatestBatchId(path: Path)(implicit ugi: UserGroupInformation): Try[Option[Long]] = {
hdfsService.exists(path).flatMap { exists =>
if (exists) {
hdfsService.listStatus(path, batchFilesFilter).map { statuses =>
statuses
.map { status =>
status.getPath.getName.toLong
}
.sorted
.lastOption

}
} else {
logger.debug(s"Could not find path $path")
Try(None)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.hyperdrive.trigger.api.rest.services

import org.apache.commons.io.IOUtils
import org.apache.hadoop.fs._
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.deploy.SparkHadoopUtil
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Service

import java.nio.charset.StandardCharsets.UTF_8
import java.security.PrivilegedExceptionAction
import scala.io.Source
import scala.util.Try

trait HdfsService {
def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean]
def open(path: Path)(implicit ugi: UserGroupInformation): Try[FSDataInputStream]
def listStatus(path: Path, filter: PathFilter)(implicit ugi: UserGroupInformation): Try[Array[FileStatus]]
def parseFileAndClose[R](pathStr: String, parseFn: Iterator[String] => R)(implicit
ugi: UserGroupInformation
): Try[Option[R]]
}

@Service
class HdfsServiceImpl extends HdfsService {
private val logger = LoggerFactory.getLogger(this.getClass)
private lazy val conf = SparkHadoopUtil.get.conf
override def exists(path: Path)(implicit ugi: UserGroupInformation): Try[Boolean] = {
Try {
doAs {
fs.exists(path)
}
}
}

override def open(path: Path)(implicit ugi: UserGroupInformation): Try[FSDataInputStream] = {
Try {
doAs {
fs.open(path)
}
}
}

override def listStatus(path: Path, filter: PathFilter)(implicit
ugi: UserGroupInformation
): Try[Array[FileStatus]] = {
Try {
doAs {
fs.listStatus(path, filter)
}
}
}

/**
* @param pathStr path to the file as a string
* @param parseFn function that parses the file line by line. Caution: It must materialize the content,
* because the file is closed after the method completes. E.g. it must not return an iterator.
* @tparam R type of the parsed value
* @return None if the file doesn't exist, Some with the parsed content
*/
override def parseFileAndClose[R](pathStr: String, parseFn: Iterator[String] => R)(implicit
ugi: UserGroupInformation
): Try[Option[R]] = {
for {
path <- Try(new Path(pathStr))
exists <- exists(path)
parseResult <-
if (exists) {
open(path).map { input =>
try {
val lines = Source.fromInputStream(input, UTF_8.name()).getLines()
Some(parseFn(lines))
} catch {
case e: Exception =>
// re-throw the exception with the log file path added
throw new Exception(s"Failed to parse file $path", e)
} finally {
IOUtils.closeQuietly(input)
}
}

} else {
logger.debug(s"Could not find file $path")
Try(None)
}
} yield parseResult
}

/**
* Must not be a lazy val, because different users should get different FileSystems. FileSystem is cached internally.
*/
private def fs = FileSystem.get(conf)

private def doAs[T](fn: => T)(implicit ugi: UserGroupInformation) = {
ugi.doAs(new PrivilegedExceptionAction[T] {
override def run(): T = {
fn
}
})
}
}
Loading