Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1648c9b
testrun fix (spark.testing.memory=1g)
dk1844 Aug 20, 2020
b1f5e17
spark-version update to v2.4.5, json4s update, aws sdk latest version…
dk1844 Aug 21, 2020
d0145ca
tests enabled in examples, order for the sample measurements tests in…
dk1844 Aug 21, 2020
ac00d10
Loader from s3 crudely works on local host by reading my SAML profile
dk1844 Aug 21, 2020
aa50232
Storer to s3 crudely works on local host by reading my SAML profile a…
dk1844 Aug 24, 2020
6ecf7f2
Unit tests for ControlMeasuresHdfsLoaderJsonFile | ControlMeasuresHdf…
dk1844 Aug 25, 2020
11b751b
Unit tests for ControlMeasuresS3LoaderJsonSpec
dk1844 Aug 27, 2020
89def5c
ScalaTest version update to 3.2.2, Mockito -> ScalaMockito
dk1844 Aug 28, 2020
003bb24
ControlMeasuresS3LoaderJsonSpec enhanced - now checking the GetObject…
dk1844 Aug 28, 2020
3e2125d
ControlMeasuresS3StorerJsonSpec added
dk1844 Aug 31, 2020
98f27b9
SampleS3Measurements1|2 now work with SAML profile and KMS Key ID loa…
dk1844 Aug 31, 2020
e993f78
scaladoc fix (reformat revert)
dk1844 Aug 31, 2020
6c51d92
scala doc touchups
dk1844 Aug 31, 2020
def1d73
storeCurrentInfoFile divided into HDFS and S3 version
dk1844 Sep 3, 2020
f609686
writeInfoFileForQueryForS3 vs writeInfoFileForQuery - based on used s…
dk1844 Sep 3, 2020
60c1be4
writeInfoFileForQueryForS3 vs writeInfoFileForQuery - fix
dk1844 Sep 4, 2020
5d051b9
SparkQueryExecutionListerner.onSuccess - only writing info with a def…
dk1844 Sep 7, 2020
3fa2198
isValidS3Path added to S3Utils, test updates
dk1844 Sep 7, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 54 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,17 @@ indepedently.

## Usage

#### Coordinate for Maven POM dependancy
### Coordinate for Maven POM dependancy

```xml
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>atum</artifactId>
<version>0.2.6</version>
<version>0.3.0</version>
</dependency>
```

#### Initial info file generation example
### Initial info file generation example

Atum provides a method for initial creation of info files from a Spark dataframe. It can be used as is or can
serve as a reference implementation for calculating control measurements.
Expand Down Expand Up @@ -132,7 +132,7 @@ are not supoprted.
// The info file contents are available as a String object in strJson.
```

#### An ETL job example
### An ETL job example

For the full example please see **SampleMeasurements1** and **SampleMeasurements2** objects from *atum.examples* project.
It uses made up Wikipedia data for computations. The source data has an info file containing the initial checkpoints,
Expand Down Expand Up @@ -183,13 +183,60 @@ In this example the data is read from 'data/input/mydata.csv' file. This data fi
in 'data/input/_INFO'. Two checkpoints are created. Any business logic can be inserted between reading the source data
and saving it to Parquet format.

### Atum library routines
### Storing Measurements in AWS S3
Starting with version 0.3.0, persistence support for AWS S3 has been added.
AWS S3 can be both used for loading the measurement data from as well as saving the measurements back to.

The following example demonstrates the setup:
```scala
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, DefaultCredentialsProvider, ProfileCredentialsProvider}
import za.co.absa.atum.AtumImplicits._
import za.co.absa.atum.persistence.{S3KmsSettings, S3Location}

object S3Example {
def main(args: Array[String]) {
val spark = SparkSession
.builder()
.appName("Example S3 Atum init showcase")
.getOrCreate()

// Here we are using default credentials provider that relies on its default credentials provider chain to obtain the credentials
// (e.g. running in EMR/EC2 with correct role assigned)
implicit val defaultCredentialsProvider: AwsCredentialsProvider = DefaultCredentialsProvider.create()
// Alternatively, one could pass specific credentials provider. An example of using local profile named "saml" can be:
// implicit val samlCredentialsProvider = ProfileCredentialsProvider.create("saml")

val sourceS3Location: S3Location = S3Location("my-bucket123", "atum/input/my_amazing_measures.csv.info")

val kmsKeyId: String = "arn:aws:kms:eu-west-1:123456789012:key/12345678-90ab-cdef-1234-567890abcdef" // just example
val destinationS3Config: (S3Location, S3KmsSettings) = (
S3Location("my-bucket123", "atum/output/my_amazing_measures2.csv.info"),
S3KmsSettings(kmsKeyId)
)

import spark.implicits._

// Initializing library to hook up to Apache Spark with S3 persistence
spark.enableControlMeasuresTrackingForS3(
sourceS3Location = Some(sourceS3Location),
destinationS3Config = Some(destinationS3Config)
).setControlMeasuresWorkflow("A job with measurements saved to S3")
}
}

```
The rest of the processing logic and programatic approach to the library remains unchanged.


## Atum library routines

The summary of common control framework routines you can use as Spark and Dataframe implicits are as follows:

| Routine | Description | Example usage |
| -------------- |:-------------------- |:---------------|
| enableControlMeasuresTracking(sourceInfoFile: *String*, destinationInfoFile: *String*) | Enable control measurements tracking. Source and destination info file paths can be omitted. If ommited, they will be automatically inferred from the input/output data sources. | spark.enableControlMeasurementsTracking() |
| enableControlMeasuresTracking(sourceInfoFile: *String*, destinationInfoFile: *String*) | Enable control measurements tracking. Source and destination info file paths can be omitted. If omitted, they will be automatically inferred from the input/output data sources. | spark.enableControlMeasurementsTracking() |
| enableControlMeasuresTrackingForS3(sourceS3Location: *Option[S3Location]*, destinationS3Config: *Option[(S3Location, S3KmsSettings)]*) | Enable control measurements tracking in S3. Source and destination parameters can be omitted. If omitted, the loading/storing part will not be used | spark.enableControlMeasuresTrackingForS3(optionalSourceS3Location, optionalDestinationS3Config) |
| isControlMeasuresTrackingEnabled: *Boolean* | Retruns true if control measurements tracking is enabled. | if (spark.isControlMeasuresTrackingEnabled) {/*do something*/} |
| disableControlMeasuresTracking() | Explicitly turn off control measurements tracking. | spark.disableControlMeasurementsTracking() |
| setCheckpoint(name: *String*) | Calculates the control measurements and appends a new checkpoint. | df.setCheckpoint("Conformance Started") |
Expand All @@ -204,7 +251,7 @@ The summary of common control framework routines you can use as Spark and Datafr
| disableCaching() | Turns off caching that happens every time a checkpoint is generated. | disableCaching() |
| setCachingStorageLevel(cacheStorageLevel: *StorageLevel*) | Specifies a Spark storage level to use for caching. Can be one of following: `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`, `MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`, `MEMORY_AND_DISK_SER_2`, `MEMORY_AND_DISK_SER_2`, `OFF_HEAP`. | setCachingStorageLevel(StorageLevel.MEMORY_AND_DISK) |

### Control measurement types
## Control measurement types

The control measurement of a column is a hash sum. It can be calculated differently depending on the column's data type and
on business requirements. This table represents all currently supported measurement types:
Expand Down
17 changes: 16 additions & 1 deletion atum/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<parent>
<groupId>za.co.absa</groupId>
<artifactId>atum-parent</artifactId>
<version>0.2.7-SNAPSHOT</version>
<version>0.3.0-SNAPSHOT</version>
</parent>


Expand All @@ -35,6 +35,21 @@
<version>${json4s.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-scala_${scala.compat.version}</artifactId>
<version>${mockito.scala.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-scala-scalatest_${scala.compat.version}</artifactId>
<version>${mockito.scala.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
50 changes: 41 additions & 9 deletions atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

package za.co.absa.atum

import scala.language.implicitConversions
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import za.co.absa.atum.core.Atum.controlFrameworkState
import za.co.absa.atum.core.{Atum, Constants, SparkEventListener, SparkQueryExecutionListener}
import za.co.absa.atum.core.{Atum, Constants}
import za.co.absa.atum.persistence._
import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile}
import za.co.absa.atum.persistence.s3.{ControlMeasuresS3LoaderJsonFile, ControlMeasuresS3StorerJsonFile}

import scala.language.implicitConversions

/**
* The object contains implicit methods for Control Framework
Expand All @@ -43,10 +47,12 @@ import za.co.absa.atum.persistence._
*
*/
object AtumImplicits {
type DefaultControlInfoStorer = ControlMeasuresStorerJsonFile
type DefaultControlInfoLoader = ControlMeasuresLoaderJsonFile
type DefaultControlInfoStorer = ControlMeasuresHdfsStorerJsonFile
type DefaultControlInfoLoader = ControlMeasuresHdfsLoaderJsonFile

implicit def StringToPath(path: String): Path = new Path(path)
implicit class StringPathExt(path: String) {
def toPath: Path = new Path(path)
}

/**
* The class contains implicit methods for [[org.apache.spark.sql.SparkSession]].
Expand All @@ -63,7 +69,7 @@ object AtumImplicits {
}

/**
* Enable control measurements tracking.
* Enable control measurements tracking on HDFS.
* Both input and output info file paths need to be provided
*
* Example info file path name: "data/input/wikidata.csv.info"
Expand All @@ -75,8 +81,29 @@ object AtumImplicits {
destinationInfoFile: String = ""): SparkSession = {
val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration

val loader = if (sourceInfoFile.isEmpty) None else Some(new DefaultControlInfoLoader(hadoopConfiguration, sourceInfoFile))
val storer = if (destinationInfoFile.isEmpty) None else Some(new DefaultControlInfoStorer(hadoopConfiguration, destinationInfoFile))
val loader = if (sourceInfoFile.isEmpty) None else Some(new DefaultControlInfoLoader(hadoopConfiguration, sourceInfoFile.toPath))
val storer = if (destinationInfoFile.isEmpty) None else Some(new DefaultControlInfoStorer(hadoopConfiguration, destinationInfoFile.toPath))

enableControlMeasuresTracking(loader, storer)
}

/**
* Enable S3-based control measurements tracking.
*
* @param sourceS3Location s3 location to load info files from in S3
* @param destinationS3Config s3 location and kms settings to save the data to in S3
* @param credentialsProvider If you do not have a specific Credentials provider, use the default
* {@link software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider#create()}
* @return spark session with atum tracking enabled
*/
def enableControlMeasuresTrackingForS3(sourceS3Location: Option[S3Location],
destinationS3Config: Option[(S3Location, S3KmsSettings)])
(implicit credentialsProvider: AwsCredentialsProvider): SparkSession = {

val loader = sourceS3Location.map(new ControlMeasuresS3LoaderJsonFile(_))
val storer = destinationS3Config.map { case (destLoc, kms) =>
new ControlMeasuresS3StorerJsonFile(destLoc, kms)
}

enableControlMeasuresTracking(loader, storer)
}
Expand Down Expand Up @@ -286,7 +313,12 @@ object AtumImplicits {
* @param outputPath A directory or a file name to save the info file to.
*/
def writeInfoFile(outputPath: String): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFile(outputPath)
Atum.controlFrameworkState.storeCurrentInfoFileOnHdfs(outputPath.toPath)
dataset
}

def writeInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Dataset[Row] = {
Atum.controlFrameworkState.storeCurrentInfoFileOnS3(s3Location, s3KmsSettings)
dataset
}

Expand Down
6 changes: 6 additions & 0 deletions atum/src/main/scala/za/co/absa/atum/core/Accumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ class Accumulator() {
}
}

/**
* Ability to view the storer if set.
* @return
*/
private[atum] def getStorer: Option[ControlMeasuresStorer] = if (isStorerLoaded) Some(storer) else None
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way, the accumulator insides are opened to the rest of the Atum to behave differently based on what kind of storer (HDFS vs S3) is used.


/**
* The method returns Control Info object in which checkpoints are sorted by calculation order.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.storage.StorageLevel
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import za.co.absa.atum.AtumImplicits.DefaultControlInfoLoader
import za.co.absa.atum.core.Atum.log
import za.co.absa.atum.core.ControlType.Count
import za.co.absa.atum.model.{RunError, RunState, _}
import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, ControlMeasuresStorerJsonFile}
import za.co.absa.atum.persistence.hdfs.ControlMeasuresHdfsStorerJsonFile
import za.co.absa.atum.persistence.s3.ControlMeasuresS3StorerJsonFile
import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, S3KmsSettings, S3Location}
import za.co.absa.atum.plugins.EventListener
import za.co.absa.atum.utils.ExecutionPlanUtils.inferInputInfoFileName

Expand Down Expand Up @@ -245,15 +248,21 @@ class ControlFrameworkState(sparkSession: SparkSession) {
}
}

private[atum] def storeCurrentInfoFile(outputHDFSPathFileName: Path, hadoopConfiguration: Configuration = sparkSession.sparkContext.hadoopConfiguration): Unit = {
private[atum] def storeCurrentInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val storer = new ControlMeasuresS3StorerJsonFile(s3Location, s3KmsSettings)
storer.store(accumulator.getControlMeasure)
Atum.log.info(s"Control measurements saved to ${s3Location.s3String()}")
}

private[atum] def storeCurrentInfoFileOnHdfs(outputHDFSPathFileName: Path, hadoopConfiguration: Configuration = sparkSession.sparkContext.hadoopConfiguration): Unit = {
val fs = FileSystem.get(hadoopConfiguration)
val outputFilePath = if (fs.isDirectory(outputHDFSPathFileName)) {
new Path(outputHDFSPathFileName, outputInfoFileName)
} else {
outputHDFSPathFileName
}

val storer = new ControlMeasuresStorerJsonFile(hadoopConfiguration, outputFilePath)
val storer = new ControlMeasuresHdfsStorerJsonFile(hadoopConfiguration, outputFilePath)
storer.store(accumulator.getControlMeasure)
Atum.log.info(s"Control measurements saved to ${outputFilePath.toUri.toString}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,32 @@ import java.io.{PrintWriter, StringWriter}

import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.util.QueryExecutionListener
import za.co.absa.atum.persistence.ControlMeasuresStorerJsonFile
import za.co.absa.atum.utils.ExecutionPlanUtils.{inferOutputFileName, inferOutputInfoFileName}
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import software.amazon.awssdk.regions.Region
import za.co.absa.atum.persistence.{S3ControlMeasuresStorer, S3KmsSettings}
import za.co.absa.atum.utils.ExecutionPlanUtils._
import za.co.absa.atum.utils.S3Utils

/**
* The class is responsible for listening to DataSet save events and outputting correcpoiding control measurements.
*/
* The class is responsible for listening to DataSet save events and outputting corresponding control measurements.
*/
class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecutionListener {

override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
if (funcName == "save") {
writeInfoFileForQuery(qe)

cf.accumulator.getStorer match {
case Some(s3storer: S3ControlMeasuresStorer) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String()}")
writeInfoFileForQueryForS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider)

case Some(_) =>
Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to HDFS")
writeInfoFileForQuery(qe)

case None =>
Atum.log.info("No storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.")
}

// Notify listeners
cf.updateRunCheckpoints(saveInfoFile = true)
Expand All @@ -53,8 +68,28 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut

// Write _INFO file to the output directory
infoFilePath.foreach(path => {
Atum.log.info(s"Infered _INFO Path = ${path.toUri.toString}")
cf.storeCurrentInfoFile(path, qe.sparkSession.sparkContext.hadoopConfiguration)
Atum.log.info(s"Inferred _INFO Path = ${path.toUri.toString}")
cf.storeCurrentInfoFileOnHdfs(path, qe.sparkSession.sparkContext.hadoopConfiguration)
})

// Write _INFO file to a registered storer
if (cf.accumulator.isStorerLoaded) {
cf.accumulator.store()
}
}

/** Write _INFO file with control measurements to the output directory based on the query plan */
private def writeInfoFileForQueryForS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = {
val infoFilePath = inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName)

// Write _INFO file to the output directory
infoFilePath.foreach(path => {

import S3Utils.StringS3LocationExt
val location = path.toS3Location(region)

Atum.log.debug(s"Inferred _INFO Location = $location")
cf.storeCurrentInfoFileOnS3(location, kmsSettings)
})

// Write _INFO file to a registered storer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ object ControlMeasuresParser {
ControlUtils.asJson[ControlMeasure](controlMeasure)
}

/**
* The method returns a prettified JSON representation of a [[za.co.absa.atum.model.ControlMeasure]] object
*/
def asJsonPretty(controlMeasure: ControlMeasure): String = {
ControlUtils.asJsonPretty[ControlMeasure](controlMeasure)
}

/**
* The method returns a [[za.co.absa.atum.model.ControlMeasure]] object parsed from JSON string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,18 @@

package za.co.absa.atum.persistence

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
import za.co.absa.atum.model.ControlMeasure

/** Trait for control measurements saving to a persistent storage */
trait ControlMeasuresStorer {
def store(controlInfo: ControlMeasure): Unit
def getInfo: String
}

trait S3ControlMeasuresStorer extends ControlMeasuresStorer {
def kmsSettings: S3KmsSettings
def outputLocation: S3Location

def credentialsProvider: AwsCredentialsProvider
}
Loading