diff --git a/README.md b/README.md index bb4fb357..27293ca5 100644 --- a/README.md +++ b/README.md @@ -86,17 +86,17 @@ indepedently. ## Usage -#### Coordinate for Maven POM dependancy +### Coordinate for Maven POM dependancy ```xml za.co.absa atum - 0.2.6 + 0.3.0 ``` -#### Initial info file generation example +### Initial info file generation example Atum provides a method for initial creation of info files from a Spark dataframe. It can be used as is or can serve as a reference implementation for calculating control measurements. @@ -132,7 +132,7 @@ are not supoprted. // The info file contents are available as a String object in strJson. ``` -#### An ETL job example +### An ETL job example For the full example please see **SampleMeasurements1** and **SampleMeasurements2** objects from *atum.examples* project. It uses made up Wikipedia data for computations. The source data has an info file containing the initial checkpoints, @@ -183,13 +183,60 @@ In this example the data is read from 'data/input/mydata.csv' file. This data fi in 'data/input/_INFO'. Two checkpoints are created. Any business logic can be inserted between reading the source data and saving it to Parquet format. -### Atum library routines +### Storing Measurements in AWS S3 +Starting with version 0.3.0, persistence support for AWS S3 has been added. +AWS S3 can be both used for loading the measurement data from as well as saving the measurements back to. + +The following example demonstrates the setup: +```scala +import org.apache.spark.sql.SparkSession +import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, DefaultCredentialsProvider, ProfileCredentialsProvider} +import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.persistence.{S3KmsSettings, S3Location} + +object S3Example { + def main(args: Array[String]) { + val spark = SparkSession + .builder() + .appName("Example S3 Atum init showcase") + .getOrCreate() + + // Here we are using default credentials provider that relies on its default credentials provider chain to obtain the credentials + // (e.g. running in EMR/EC2 with correct role assigned) + implicit val defaultCredentialsProvider: AwsCredentialsProvider = DefaultCredentialsProvider.create() + // Alternatively, one could pass specific credentials provider. An example of using local profile named "saml" can be: + // implicit val samlCredentialsProvider = ProfileCredentialsProvider.create("saml") + + val sourceS3Location: S3Location = S3Location("my-bucket123", "atum/input/my_amazing_measures.csv.info") + + val kmsKeyId: String = "arn:aws:kms:eu-west-1:123456789012:key/12345678-90ab-cdef-1234-567890abcdef" // just example + val destinationS3Config: (S3Location, S3KmsSettings) = ( + S3Location("my-bucket123", "atum/output/my_amazing_measures2.csv.info"), + S3KmsSettings(kmsKeyId) + ) + + import spark.implicits._ + + // Initializing library to hook up to Apache Spark with S3 persistence + spark.enableControlMeasuresTrackingForS3( + sourceS3Location = Some(sourceS3Location), + destinationS3Config = Some(destinationS3Config) + ).setControlMeasuresWorkflow("A job with measurements saved to S3") + } +} + +``` +The rest of the processing logic and programatic approach to the library remains unchanged. + + +## Atum library routines The summary of common control framework routines you can use as Spark and Dataframe implicits are as follows: | Routine | Description | Example usage | | -------------- |:-------------------- |:---------------| -| enableControlMeasuresTracking(sourceInfoFile: *String*, destinationInfoFile: *String*) | Enable control measurements tracking. Source and destination info file paths can be omitted. If ommited, they will be automatically inferred from the input/output data sources. | spark.enableControlMeasurementsTracking() | +| enableControlMeasuresTracking(sourceInfoFile: *String*, destinationInfoFile: *String*) | Enable control measurements tracking. Source and destination info file paths can be omitted. If omitted, they will be automatically inferred from the input/output data sources. | spark.enableControlMeasurementsTracking() | +| enableControlMeasuresTrackingForS3(sourceS3Location: *Option[S3Location]*, destinationS3Config: *Option[(S3Location, S3KmsSettings)]*) | Enable control measurements tracking in S3. Source and destination parameters can be omitted. If omitted, the loading/storing part will not be used | spark.enableControlMeasuresTrackingForS3(optionalSourceS3Location, optionalDestinationS3Config) | | isControlMeasuresTrackingEnabled: *Boolean* | Retruns true if control measurements tracking is enabled. | if (spark.isControlMeasuresTrackingEnabled) {/*do something*/} | | disableControlMeasuresTracking() | Explicitly turn off control measurements tracking. | spark.disableControlMeasurementsTracking() | | setCheckpoint(name: *String*) | Calculates the control measurements and appends a new checkpoint. | df.setCheckpoint("Conformance Started") | @@ -204,7 +251,7 @@ The summary of common control framework routines you can use as Spark and Datafr | disableCaching() | Turns off caching that happens every time a checkpoint is generated. | disableCaching() | | setCachingStorageLevel(cacheStorageLevel: *StorageLevel*) | Specifies a Spark storage level to use for caching. Can be one of following: `NONE`, `DISK_ONLY`, `DISK_ONLY_2`, `MEMORY_ONLY`, `MEMORY_ONLY_2`, `MEMORY_ONLY_SER`, `MEMORY_ONLY_SER_2`, `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `MEMORY_AND_DISK_SER`, `MEMORY_AND_DISK_SER_2`, `MEMORY_AND_DISK_SER_2`, `OFF_HEAP`. | setCachingStorageLevel(StorageLevel.MEMORY_AND_DISK) | -### Control measurement types +## Control measurement types The control measurement of a column is a hash sum. It can be calculated differently depending on the column's data type and on business requirements. This table represents all currently supported measurement types: diff --git a/atum/pom.xml b/atum/pom.xml index 0286bd49..18a91fa4 100644 --- a/atum/pom.xml +++ b/atum/pom.xml @@ -24,7 +24,7 @@ za.co.absa atum-parent - 0.2.7-SNAPSHOT + 0.3.0-SNAPSHOT @@ -35,6 +35,21 @@ ${json4s.version} provided + + + org.mockito + mockito-scala_${scala.compat.version} + ${mockito.scala.version} + test + + + + org.mockito + mockito-scala-scalatest_${scala.compat.version} + ${mockito.scala.version} + test + + diff --git a/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala b/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala index f37996bd..f85d0d0c 100644 --- a/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala +++ b/atum/src/main/scala/za/co/absa/atum/AtumImplicits.scala @@ -15,12 +15,16 @@ package za.co.absa.atum -import scala.language.implicitConversions import org.apache.hadoop.fs.Path import org.apache.spark.sql.{Dataset, Row, SparkSession} +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import za.co.absa.atum.core.Atum.controlFrameworkState -import za.co.absa.atum.core.{Atum, Constants, SparkEventListener, SparkQueryExecutionListener} +import za.co.absa.atum.core.{Atum, Constants} import za.co.absa.atum.persistence._ +import za.co.absa.atum.persistence.hdfs.{ControlMeasuresHdfsLoaderJsonFile, ControlMeasuresHdfsStorerJsonFile} +import za.co.absa.atum.persistence.s3.{ControlMeasuresS3LoaderJsonFile, ControlMeasuresS3StorerJsonFile} + +import scala.language.implicitConversions /** * The object contains implicit methods for Control Framework @@ -43,10 +47,12 @@ import za.co.absa.atum.persistence._ * */ object AtumImplicits { - type DefaultControlInfoStorer = ControlMeasuresStorerJsonFile - type DefaultControlInfoLoader = ControlMeasuresLoaderJsonFile + type DefaultControlInfoStorer = ControlMeasuresHdfsStorerJsonFile + type DefaultControlInfoLoader = ControlMeasuresHdfsLoaderJsonFile - implicit def StringToPath(path: String): Path = new Path(path) + implicit class StringPathExt(path: String) { + def toPath: Path = new Path(path) + } /** * The class contains implicit methods for [[org.apache.spark.sql.SparkSession]]. @@ -63,7 +69,7 @@ object AtumImplicits { } /** - * Enable control measurements tracking. + * Enable control measurements tracking on HDFS. * Both input and output info file paths need to be provided * * Example info file path name: "data/input/wikidata.csv.info" @@ -75,8 +81,29 @@ object AtumImplicits { destinationInfoFile: String = ""): SparkSession = { val hadoopConfiguration = sparkSession.sparkContext.hadoopConfiguration - val loader = if (sourceInfoFile.isEmpty) None else Some(new DefaultControlInfoLoader(hadoopConfiguration, sourceInfoFile)) - val storer = if (destinationInfoFile.isEmpty) None else Some(new DefaultControlInfoStorer(hadoopConfiguration, destinationInfoFile)) + val loader = if (sourceInfoFile.isEmpty) None else Some(new DefaultControlInfoLoader(hadoopConfiguration, sourceInfoFile.toPath)) + val storer = if (destinationInfoFile.isEmpty) None else Some(new DefaultControlInfoStorer(hadoopConfiguration, destinationInfoFile.toPath)) + + enableControlMeasuresTracking(loader, storer) + } + + /** + * Enable S3-based control measurements tracking. + * + * @param sourceS3Location s3 location to load info files from in S3 + * @param destinationS3Config s3 location and kms settings to save the data to in S3 + * @param credentialsProvider If you do not have a specific Credentials provider, use the default + * {@link software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider#create()} + * @return spark session with atum tracking enabled + */ + def enableControlMeasuresTrackingForS3(sourceS3Location: Option[S3Location], + destinationS3Config: Option[(S3Location, S3KmsSettings)]) + (implicit credentialsProvider: AwsCredentialsProvider): SparkSession = { + + val loader = sourceS3Location.map(new ControlMeasuresS3LoaderJsonFile(_)) + val storer = destinationS3Config.map { case (destLoc, kms) => + new ControlMeasuresS3StorerJsonFile(destLoc, kms) + } enableControlMeasuresTracking(loader, storer) } @@ -286,7 +313,12 @@ object AtumImplicits { * @param outputPath A directory or a file name to save the info file to. */ def writeInfoFile(outputPath: String): Dataset[Row] = { - Atum.controlFrameworkState.storeCurrentInfoFile(outputPath) + Atum.controlFrameworkState.storeCurrentInfoFileOnHdfs(outputPath.toPath) + dataset + } + + def writeInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Dataset[Row] = { + Atum.controlFrameworkState.storeCurrentInfoFileOnS3(s3Location, s3KmsSettings) dataset } diff --git a/atum/src/main/scala/za/co/absa/atum/core/Accumulator.scala b/atum/src/main/scala/za/co/absa/atum/core/Accumulator.scala index ea60572b..ca043cef 100644 --- a/atum/src/main/scala/za/co/absa/atum/core/Accumulator.scala +++ b/atum/src/main/scala/za/co/absa/atum/core/Accumulator.scala @@ -60,6 +60,12 @@ class Accumulator() { } } + /** + * Ability to view the storer if set. + * @return + */ + private[atum] def getStorer: Option[ControlMeasuresStorer] = if (isStorerLoaded) Some(storer) else None + /** * The method returns Control Info object in which checkpoints are sorted by calculation order. */ diff --git a/atum/src/main/scala/za/co/absa/atum/core/ControlFrameworkState.scala b/atum/src/main/scala/za/co/absa/atum/core/ControlFrameworkState.scala index 9663ca36..6f2535ec 100644 --- a/atum/src/main/scala/za/co/absa/atum/core/ControlFrameworkState.scala +++ b/atum/src/main/scala/za/co/absa/atum/core/ControlFrameworkState.scala @@ -19,11 +19,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.storage.StorageLevel +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import za.co.absa.atum.AtumImplicits.DefaultControlInfoLoader import za.co.absa.atum.core.Atum.log import za.co.absa.atum.core.ControlType.Count import za.co.absa.atum.model.{RunError, RunState, _} -import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, ControlMeasuresStorerJsonFile} +import za.co.absa.atum.persistence.hdfs.ControlMeasuresHdfsStorerJsonFile +import za.co.absa.atum.persistence.s3.ControlMeasuresS3StorerJsonFile +import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresStorer, S3KmsSettings, S3Location} import za.co.absa.atum.plugins.EventListener import za.co.absa.atum.utils.ExecutionPlanUtils.inferInputInfoFileName @@ -245,7 +248,13 @@ class ControlFrameworkState(sparkSession: SparkSession) { } } - private[atum] def storeCurrentInfoFile(outputHDFSPathFileName: Path, hadoopConfiguration: Configuration = sparkSession.sparkContext.hadoopConfiguration): Unit = { + private[atum] def storeCurrentInfoFileOnS3(s3Location: S3Location, s3KmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = { + val storer = new ControlMeasuresS3StorerJsonFile(s3Location, s3KmsSettings) + storer.store(accumulator.getControlMeasure) + Atum.log.info(s"Control measurements saved to ${s3Location.s3String()}") + } + + private[atum] def storeCurrentInfoFileOnHdfs(outputHDFSPathFileName: Path, hadoopConfiguration: Configuration = sparkSession.sparkContext.hadoopConfiguration): Unit = { val fs = FileSystem.get(hadoopConfiguration) val outputFilePath = if (fs.isDirectory(outputHDFSPathFileName)) { new Path(outputHDFSPathFileName, outputInfoFileName) @@ -253,7 +262,7 @@ class ControlFrameworkState(sparkSession: SparkSession) { outputHDFSPathFileName } - val storer = new ControlMeasuresStorerJsonFile(hadoopConfiguration, outputFilePath) + val storer = new ControlMeasuresHdfsStorerJsonFile(hadoopConfiguration, outputFilePath) storer.store(accumulator.getControlMeasure) Atum.log.info(s"Control measurements saved to ${outputFilePath.toUri.toString}") } diff --git a/atum/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListener.scala b/atum/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListener.scala index f5a8d42a..d62e1f59 100644 --- a/atum/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListener.scala +++ b/atum/src/main/scala/za/co/absa/atum/core/SparkQueryExecutionListener.scala @@ -19,17 +19,32 @@ import java.io.{PrintWriter, StringWriter} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener -import za.co.absa.atum.persistence.ControlMeasuresStorerJsonFile -import za.co.absa.atum.utils.ExecutionPlanUtils.{inferOutputFileName, inferOutputInfoFileName} +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider +import software.amazon.awssdk.regions.Region +import za.co.absa.atum.persistence.{S3ControlMeasuresStorer, S3KmsSettings} +import za.co.absa.atum.utils.ExecutionPlanUtils._ +import za.co.absa.atum.utils.S3Utils /** - * The class is responsible for listening to DataSet save events and outputting correcpoiding control measurements. - */ + * The class is responsible for listening to DataSet save events and outputting corresponding control measurements. + */ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { if (funcName == "save") { - writeInfoFileForQuery(qe) + + cf.accumulator.getStorer match { + case Some(s3storer: S3ControlMeasuresStorer) => + Atum.log.debug(s"SparkQueryExecutionListener.onSuccess for S3ControlMeasuresStorer: writing to ${s3storer.outputLocation.s3String()}") + writeInfoFileForQueryForS3(qe, s3storer.outputLocation.region, s3storer.kmsSettings)(s3storer.credentialsProvider) + + case Some(_) => + Atum.log.debug(s"SparkQueryExecutionListener.onSuccess: writing to HDFS") + writeInfoFileForQuery(qe) + + case None => + Atum.log.info("No storer is set, therefore no data will be written the automatically with DF-save to an _INFO file.") + } // Notify listeners cf.updateRunCheckpoints(saveInfoFile = true) @@ -53,8 +68,28 @@ class SparkQueryExecutionListener(cf: ControlFrameworkState) extends QueryExecut // Write _INFO file to the output directory infoFilePath.foreach(path => { - Atum.log.info(s"Infered _INFO Path = ${path.toUri.toString}") - cf.storeCurrentInfoFile(path, qe.sparkSession.sparkContext.hadoopConfiguration) + Atum.log.info(s"Inferred _INFO Path = ${path.toUri.toString}") + cf.storeCurrentInfoFileOnHdfs(path, qe.sparkSession.sparkContext.hadoopConfiguration) + }) + + // Write _INFO file to a registered storer + if (cf.accumulator.isStorerLoaded) { + cf.accumulator.store() + } + } + + /** Write _INFO file with control measurements to the output directory based on the query plan */ + private def writeInfoFileForQueryForS3(qe: QueryExecution, region: Region, kmsSettings: S3KmsSettings)(implicit credentialsProvider: AwsCredentialsProvider): Unit = { + val infoFilePath = inferOutputInfoFileNameOnS3(qe, cf.outputInfoFileName) + + // Write _INFO file to the output directory + infoFilePath.foreach(path => { + + import S3Utils.StringS3LocationExt + val location = path.toS3Location(region) + + Atum.log.debug(s"Inferred _INFO Location = $location") + cf.storeCurrentInfoFileOnS3(location, kmsSettings) }) // Write _INFO file to a registered storer diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresParser.scala b/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresParser.scala index 981c4f43..22c45055 100644 --- a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresParser.scala +++ b/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresParser.scala @@ -29,6 +29,13 @@ object ControlMeasuresParser { ControlUtils.asJson[ControlMeasure](controlMeasure) } + /** + * The method returns a prettified JSON representation of a [[za.co.absa.atum.model.ControlMeasure]] object + */ + def asJsonPretty(controlMeasure: ControlMeasure): String = { + ControlUtils.asJsonPretty[ControlMeasure](controlMeasure) + } + /** * The method returns a [[za.co.absa.atum.model.ControlMeasure]] object parsed from JSON string. */ diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorer.scala b/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorer.scala index 483b4b27..2ea99afd 100644 --- a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorer.scala +++ b/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorer.scala @@ -15,6 +15,7 @@ package za.co.absa.atum.persistence +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider import za.co.absa.atum.model.ControlMeasure /** Trait for control measurements saving to a persistent storage */ @@ -22,3 +23,10 @@ trait ControlMeasuresStorer { def store(controlInfo: ControlMeasure): Unit def getInfo: String } + +trait S3ControlMeasuresStorer extends ControlMeasuresStorer { + def kmsSettings: S3KmsSettings + def outputLocation: S3Location + + def credentialsProvider: AwsCredentialsProvider +} diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresLoaderJsonFile.scala b/atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonFile.scala similarity index 69% rename from atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresLoaderJsonFile.scala rename to atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonFile.scala index ca33aca1..266b4ab1 100644 --- a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresLoaderJsonFile.scala +++ b/atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonFile.scala @@ -13,23 +13,20 @@ * limitations under the License. */ -package za.co.absa.atum.persistence +package za.co.absa.atum.persistence.hdfs -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import za.co.absa.atum.model.ControlMeasure -import za.co.absa.atum.utils.ControlUtils - -import scala.collection.JavaConverters._ +import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresParser} +import za.co.absa.atum.utils.{ControlUtils, HdfsFileUtils} /** A loader of control measurements from a JSON file stored in HDFS filesystem. */ -class ControlMeasuresLoaderJsonFile(hadoopConfiguration: Configuration, path: Path) extends ControlMeasuresLoader { +class ControlMeasuresHdfsLoaderJsonFile(hadoopConfiguration: Configuration, path: Path) extends ControlMeasuresLoader { override def load(): ControlMeasure = { - val fs = FileSystem.get(hadoopConfiguration) - val stream = fs.open(path) - val controlInfoJson = try IOUtils.readLines(stream).asScala.mkString("\n") finally stream.close() + implicit val fs = FileSystem.get(hadoopConfiguration) + val controlInfoJson = HdfsFileUtils.readHdfsFileToString(path) ControlUtils.preprocessControlMeasure(ControlMeasuresParser fromJson controlInfoJson) } diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorerJsonFile.scala b/atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonFile.scala similarity index 86% rename from atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorerJsonFile.scala rename to atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonFile.scala index 5e325f51..291eba47 100644 --- a/atum/src/main/scala/za/co/absa/atum/persistence/ControlMeasuresStorerJsonFile.scala +++ b/atum/src/main/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonFile.scala @@ -13,16 +13,17 @@ * limitations under the License. */ -package za.co.absa.atum.persistence +package za.co.absa.atum.persistence.hdfs import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.fs.{FileSystem, Path} import za.co.absa.atum.model.ControlMeasure +import za.co.absa.atum.persistence.{ControlMeasuresParser, ControlMeasuresStorer} import za.co.absa.atum.utils.ARMImplicits /** A storer of control measurements to HDFS filesystem as a JSON file . */ -class ControlMeasuresStorerJsonFile(hadoopConfiguration: Configuration, path: Path) extends ControlMeasuresStorer { +class ControlMeasuresHdfsStorerJsonFile(hadoopConfiguration: Configuration, path: Path) extends ControlMeasuresStorer { override def store(controlInfo: ControlMeasure): Unit = { val serialized = ControlMeasuresParser asJson controlInfo saveDataToFile(serialized) diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonFile.scala b/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonFile.scala new file mode 100644 index 00000000..421e4029 --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonFile.scala @@ -0,0 +1,49 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.persistence.s3 + +import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, DefaultCredentialsProvider} +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.GetObjectRequest +import za.co.absa.atum.model.ControlMeasure +import za.co.absa.atum.persistence.{ControlMeasuresLoader, ControlMeasuresParser, S3Location} +import za.co.absa.atum.utils.{ControlUtils, S3Utils} + +/** + * A loader of control measurements from a JSON file stored in AWS S3. + * @param inputLocation S3 location to read the json measurements from + * @param credentialsProvider a specific credentials provider (e.g. SAML profile). Consider using [[DefaultCredentialsProvider#create()]] when in doubt. + */ +class ControlMeasuresS3LoaderJsonFile(inputLocation: S3Location) + (implicit credentialsProvider: AwsCredentialsProvider) extends ControlMeasuresLoader { + override def load(): ControlMeasure = { + val s3Client: S3Client = getS3Client + + val getRequest = GetObjectRequest + .builder().bucket(inputLocation.bucketName).key(inputLocation.path) + .build() + + val controlInfoJson = s3Client.getObjectAsBytes(getRequest).asUtf8String() + ControlUtils.preprocessControlMeasure(ControlMeasuresParser fromJson controlInfoJson) + } + + override def getInfo: String = { + s"JSON deserializer from ${inputLocation.s3String()}" + } + + private[s3] def getS3Client: S3Client = S3Utils.getS3Client(inputLocation.region, credentialsProvider) + +} diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonFile.scala b/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonFile.scala new file mode 100644 index 00000000..03cb9dc7 --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonFile.scala @@ -0,0 +1,65 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.persistence.s3 + +import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, DefaultCredentialsProvider} +import software.amazon.awssdk.core.exception.SdkException +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.PutObjectRequest +import za.co.absa.atum.model.ControlMeasure +import za.co.absa.atum.persistence.{ControlMeasuresParser, S3ControlMeasuresStorer, S3KmsSettings, S3Location} +import za.co.absa.atum.utils.S3Utils + +/** + * A storer of control measurements to a JSON file stored in AWS S3. + * + * @param outputLocation s3 location to save measurements data to + * @param kmsSettings KMS settings - server side encryption configuration + * @param credentialsProvider a specific credentials provider (e.g. SAML profile). Consider using [[DefaultCredentialsProvider#create()]] when in doubt. + */ +class ControlMeasuresS3StorerJsonFile(val outputLocation: S3Location, val kmsSettings: S3KmsSettings) + (implicit val credentialsProvider: AwsCredentialsProvider) extends S3ControlMeasuresStorer { + + /** + * Stores the `controlInfo` measurement to an S3 location. + * + * @param controlInfo measurements to store + * @throws SdkException when storing fails. + */ + override def store(controlInfo: ControlMeasure): Unit = { + val serialized = ControlMeasuresParser asJson controlInfo + saveDataToFile(serialized) + } + + private def saveDataToFile(data: String): Unit = { + val s3Client = getS3Client + + val putRequest = PutObjectRequest.builder.bucket(outputLocation.bucketName).key(outputLocation.path) + .serverSideEncryption(kmsSettings.serverSideEncryption) + .ssekmsKeyId(kmsSettings.kmsKeyId) + .build() + + // would throw S3Exception or SdkClientException in case of failure (base exception class: SdkException) + s3Client.putObject(putRequest, RequestBody.fromString(data)) + } + + override def getInfo: String = { + s"JSON serializer for Storer to ${outputLocation.s3String()}" + } + + private[s3] def getS3Client: S3Client = S3Utils.getS3Client(outputLocation.region, credentialsProvider) +} diff --git a/atum/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala b/atum/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala new file mode 100644 index 00000000..4ecd496f --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/persistence/s3/S3Location.scala @@ -0,0 +1,16 @@ +package za.co.absa.atum.persistence + +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.model.ServerSideEncryption + +case class S3Location(bucketName: String, path: String, region: Region) { + /** + * Returns formatted S3 string, e.g. `s3://myBucket/path/to/somewhere` + * @param protocol http "s3" protocol, e.g. s3, s3n, s3a. Default = "s3". + * @return formatted s3 string + */ + def s3String(protocol: String = "s3"): String = s"s3://$bucketName/$path" +} + +case class S3KmsSettings(kmsKeyId: String, serverSideEncryption: ServerSideEncryption = ServerSideEncryption.AWS_KMS) + diff --git a/atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala b/atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala index 316683c5..17223c48 100644 --- a/atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala +++ b/atum/src/main/scala/za/co/absa/atum/utils/ExecutionPlanUtils.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand} import org.apache.spark.sql.{Dataset, Row} import za.co.absa.atum.core.Constants +import za.co.absa.atum.utils.FileUtils.PathJoin /** * This object contains utils for traversing execution plan DAG to infer control measurement input/output paths @@ -90,7 +91,7 @@ object ExecutionPlanUtils { } /** - * The method returns output control measurements info file name inferred from the source dataset + * The method returns output control measurements info file name inferred from the source dataset on HDFS * * @param qe A query execution object where output path name will be searched * @param infoFileName A file name of an info file, e.g. "_INFO" @@ -112,7 +113,28 @@ object ExecutionPlanUtils { } } - /** + /** + * The method returns output control measurements info file name inferred from the source dataset on S3 + * + * @param qe A query execution object where output path name will be searched + * @param infoFileName A file name of an info file, e.g. "_INFO" + * + * @return The inferred output control measurements file path of the source dataset + */ + def inferOutputInfoFileNameOnS3(qe: QueryExecution, infoFileName: String = Constants.DefaultInfoFileName): Option[String] = { + qe.analyzed match { + case s: SaveIntoDataSourceCommand => + Some(s.options("path") / infoFileName) + case _ => + log.warn(s"Logical plan: ${qe.logical.treeString}") + log.warn(s"Analyzed plan: ${qe.analyzed.treeString}") + log.warn(s"Optimized plan: ${qe.optimizedPlan.treeString}") + log.error(s"Unable to infer output path for control measurements for query execution $qe.") + None + } + } + + /** * The method returns source file names of a DataSet execution plan by traversing the DAG. * Thanks za.co.absa.spline.core * diff --git a/atum/src/main/scala/za/co/absa/atum/utils/FileUtils.scala b/atum/src/main/scala/za/co/absa/atum/utils/FileUtils.scala new file mode 100644 index 00000000..b179f46d --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/utils/FileUtils.scala @@ -0,0 +1,18 @@ +package za.co.absa.atum.utils + +object FileUtils { + def readFileToString(path: String): String = { + val testTxtSource = scala.io.Source.fromFile(path) + val str = testTxtSource.mkString + testTxtSource.close() + + str + } + + implicit class PathJoin(path: String) { + def /(pathSegment: String): String = { + s"${path.stripSuffix("/")}/${pathSegment.stripPrefix("/")}" + } + } + +} diff --git a/atum/src/main/scala/za/co/absa/atum/utils/HdfsFileUtils.scala b/atum/src/main/scala/za/co/absa/atum/utils/HdfsFileUtils.scala new file mode 100644 index 00000000..8820e80a --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/utils/HdfsFileUtils.scala @@ -0,0 +1,18 @@ +package za.co.absa.atum.utils + +import org.apache.commons.io.IOUtils +import org.apache.hadoop.fs.{FileSystem, Path} + +import scala.collection.JavaConverters._ + +object HdfsFileUtils { + + def readHdfsFileToString(path: Path)(implicit fs: FileSystem): String = { + val stream = fs.open(path) + try + IOUtils.readLines(stream).asScala.mkString("\n") + finally + stream.close() + } + +} diff --git a/atum/src/main/scala/za/co/absa/atum/utils/S3Utils.scala b/atum/src/main/scala/za/co/absa/atum/utils/S3Utils.scala new file mode 100644 index 00000000..a55b6009 --- /dev/null +++ b/atum/src/main/scala/za/co/absa/atum/utils/S3Utils.scala @@ -0,0 +1,44 @@ +package za.co.absa.atum.utils + +import software.amazon.awssdk.auth.credentials.{AwsCredentialsProvider, ProfileCredentialsProvider} +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import za.co.absa.atum.core.Atum.log +import za.co.absa.atum.persistence.S3Location + +object S3Utils { + + def getLocalProfileCredentialsProvider(credentialsProfileName: String): ProfileCredentialsProvider = { + val localProfileCredentials = ProfileCredentialsProvider.create(credentialsProfileName) + log.debug(s"Credentials of local $credentialsProfileName profile =" + + s" ${localProfileCredentials.resolveCredentials().accessKeyId()}, ${localProfileCredentials.resolveCredentials().secretAccessKey().take(5)}...") + + localProfileCredentials + } + + def getS3Client(region: Region, credentialsProvider: AwsCredentialsProvider): S3Client = { + S3Client.builder() + .region(region) + .credentialsProvider(credentialsProvider) + .build() + } + + // hint: https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html#bucketnamingrules + val S3LocationRx = "s3(?:a|n)?://([-a-z0-9.]{3,63})/(.*)".r + + implicit class StringS3LocationExt(path: String) { + + def toS3Location(withRegion: Region): S3Location = { + path match { + case S3LocationRx(bucketName, path) => S3Location(bucketName, path, withRegion) + case _ => throw new IllegalArgumentException(s"Could not parse S3 Location from $path using rx $S3LocationRx.") + } + } + + def isValidS3Path: Boolean = path match { + case S3LocationRx(_, _) => true + case _ => false + } + } + +} diff --git a/atum/src/main/scala/za/co/absa/atum/utils/SparkTestBase.scala b/atum/src/main/scala/za/co/absa/atum/utils/SparkTestBase.scala index 63697264..85d4f99b 100644 --- a/atum/src/main/scala/za/co/absa/atum/utils/SparkTestBase.scala +++ b/atum/src/main/scala/za/co/absa/atum/utils/SparkTestBase.scala @@ -30,6 +30,7 @@ trait SparkTestBase { .config("spark.driver.bindAddress", "127.0.0.1") .config("spark.driver.host", "127.0.0.1") .config("spark.ui.enabled", "false") + .config("spark.testing.memory", 1024*1024*1024) // otherwise may fail based on local machine settings .getOrCreate() } diff --git a/atum/src/test/resources/example_input.info b/atum/src/test/resources/example_input.info new file mode 100644 index 00000000..dba19355 --- /dev/null +++ b/atum/src/test/resources/example_input.info @@ -0,0 +1,29 @@ +{ + "metadata": { + "sourceApplication": "AtumTest", + "country": "CZ", + "historyType": "Snapshot", + "dataFilename": "example_input.csv", + "sourceType": "public", + "version": 1, + "informationDate": "01-01-2020", + "additionalInfo": { } + }, + "checkpoints": [ + { + "name": "checkpointA", + "processStartTime": "01-01-2020 08:00:00", + "processEndTime": "01-01-2020 08:00:10", + "workflowName": "wf1", + "order": 1, + "controls": [ + { + "controlName": "control1", + "controlType": "someControlType", + "controlCol": "column1", + "controlValue": "1234" + } + ] + } + ] +} diff --git a/atum/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/atum/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..1f0955d4 --- /dev/null +++ b/atum/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline diff --git a/atum/src/test/scala/za/co/absa/atum/BigDecimalToJsonSerializationSpec.scala b/atum/src/test/scala/za/co/absa/atum/BigDecimalToJsonSerializationSpec.scala index cab11840..19153e76 100644 --- a/atum/src/test/scala/za/co/absa/atum/BigDecimalToJsonSerializationSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/BigDecimalToJsonSerializationSpec.scala @@ -15,13 +15,14 @@ package za.co.absa.atum -import org.scalatest.{FlatSpec, Matchers} import org.json4s._ import org.json4s.jackson.Serialization import org.json4s.jackson.Serialization.write +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers import za.co.absa.atum.model.Measurement -class BigDecimalToJsonSerializationSpec extends FlatSpec with Matchers { +class BigDecimalToJsonSerializationSpec extends AnyFlatSpec with Matchers { implicit val formats: Formats = Serialization.formats(NoTypeHints).withBigDecimal "write" should "serialize a scala.math.BigDecimal" in diff --git a/atum/src/test/scala/za/co/absa/atum/CachingStorageLevelSpec.scala b/atum/src/test/scala/za/co/absa/atum/CachingStorageLevelSpec.scala index 7080b9c7..14805769 100644 --- a/atum/src/test/scala/za/co/absa/atum/CachingStorageLevelSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/CachingStorageLevelSpec.scala @@ -16,11 +16,13 @@ package za.co.absa.atum import org.apache.spark.storage.StorageLevel -import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers} +import org.scalatest.BeforeAndAfter +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers import za.co.absa.atum.core.Atum import za.co.absa.atum.utils.SparkTestBase -class CachingStorageLevelSpec extends FlatSpec with Matchers with SparkTestBase with BeforeAndAfter { +class CachingStorageLevelSpec extends AnyFlatSpec with Matchers with SparkTestBase with BeforeAndAfter { before { Atum.init(spark) diff --git a/atum/src/test/scala/za/co/absa/atum/ControlInfoToJsonSerializationSpec.scala b/atum/src/test/scala/za/co/absa/atum/ControlInfoToJsonSerializationSpec.scala index 76f66e8a..8994897d 100644 --- a/atum/src/test/scala/za/co/absa/atum/ControlInfoToJsonSerializationSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/ControlInfoToJsonSerializationSpec.scala @@ -15,15 +15,15 @@ package za.co.absa.atum -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata, Measurement} import za.co.absa.atum.utils.{BuildProperties, ControlUtils} -import za.co.absa.atum.model._ /** * Unit tests for ControlInfo object serialization */ -class ControlInfoToJsonSerializationSpec extends FlatSpec with Matchers { +class ControlInfoToJsonSerializationSpec extends AnyFlatSpec with Matchers { val exampleCtrlInfo = ControlMeasure( metadata = ControlMeasureMetadata( sourceApplication = "FrontArena", diff --git a/atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala b/atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala index 5fcb9f67..5b27671c 100644 --- a/atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/ControlMeasurementsSpec.scala @@ -16,13 +16,14 @@ package za.co.absa.atum import org.apache.spark.sql.types._ -import org.scalatest.{FlatSpec, Matchers} -import za.co.absa.atum.core.{Constants, ControlType, MeasurementProcessor} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.core.{ControlType, MeasurementProcessor} import za.co.absa.atum.model.Measurement import za.co.absa.atum.utils.SparkTestBase //noinspection ZeroIndexToHead -class ControlMeasurementsSpec extends FlatSpec with Matchers with SparkTestBase { +class ControlMeasurementsSpec extends AnyFlatSpec with Matchers with SparkTestBase { import spark.implicits._ diff --git a/atum/src/test/scala/za/co/absa/atum/ControlUtilsSpec.scala b/atum/src/test/scala/za/co/absa/atum/ControlUtilsSpec.scala index d1085b3a..0c77aa51 100644 --- a/atum/src/test/scala/za/co/absa/atum/ControlUtilsSpec.scala +++ b/atum/src/test/scala/za/co/absa/atum/ControlUtilsSpec.scala @@ -16,12 +16,12 @@ package za.co.absa.atum import org.apache.spark.sql.types._ -import org.scalatest._ -import za.co.absa.atum.utils.ControlUtils -import za.co.absa.atum.utils.SparkTestBase +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.utils.{ControlUtils, SparkTestBase} -class ControlUtilsSpec extends FlatSpec with Matchers with SparkTestBase { +class ControlUtilsSpec extends AnyFlatSpec with Matchers with SparkTestBase { import spark.implicits._ private val singleStringColumnDF = spark.sparkContext.parallelize(List("987987", "example", "example", "another example")).toDF diff --git a/atum/src/test/scala/za/co/absa/atum/persistence/TestResources.scala b/atum/src/test/scala/za/co/absa/atum/persistence/TestResources.scala new file mode 100644 index 00000000..5cb3b040 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/persistence/TestResources.scala @@ -0,0 +1,24 @@ +package za.co.absa.atum.persistence + +import za.co.absa.atum.model.{Checkpoint, ControlMeasure, ControlMeasureMetadata, Measurement} + +object TestResources { + + object InputInfo { + val localPath: String = getClass.getResource("/example_input.info").getPath + + // conforms to the content of the Resource file `example_input.info` + val controlMeasure = ControlMeasure( + ControlMeasureMetadata("AtumTest", "CZ", "Snapshot", "example_input.csv", "public", 1, "01-01-2020", Map.empty), + runUniqueId = None, + List(Checkpoint("checkpointA", None, None, "01-01-2020 08:00:00", "01-01-2020 08:00:10", "wf1", 1, List( + Measurement("control1", "someControlType", "column1", "1234") + ))) + ) + } + + def filterWhitespaces(content: String): String = { + content.filterNot(_.isWhitespace) + } + +} diff --git a/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonSpec.scala b/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonSpec.scala new file mode 100644 index 00000000..efe4b329 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsLoaderJsonSpec.scala @@ -0,0 +1,20 @@ +package za.co.absa.atum.persistence.hdfs + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.persistence.TestResources + +class ControlMeasuresHdfsLoaderJsonSpec extends AnyFlatSpec with Matchers { + + val inputPath: String = TestResources.InputInfo.localPath + val expectedInputControlMeasure = TestResources.InputInfo.controlMeasure + + "ControlMeasuresHdfsLoaderJsonFile" should "load json file from HDFS" in { + val loadedControlMeasure = new ControlMeasuresHdfsLoaderJsonFile(new Configuration(), new Path(inputPath)).load() + + loadedControlMeasure shouldBe expectedInputControlMeasure + } + +} diff --git a/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonSpec.scala b/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonSpec.scala new file mode 100644 index 00000000..c47db180 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/persistence/hdfs/ControlMeasuresHdfsStorerJsonSpec.scala @@ -0,0 +1,34 @@ +package za.co.absa.atum.persistence.hdfs + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.persistence.TestResources +import za.co.absa.atum.utils.{FileUtils, HdfsFileUtils} + +class ControlMeasuresHdfsStorerJsonSpec extends AnyFlatSpec with Matchers { + + val expectedFilePath: String = TestResources.InputInfo.localPath + val inputControlMeasure = TestResources.InputInfo.controlMeasure + + val hadoopConfiguration = new Configuration() + implicit val fs = FileSystem.get(hadoopConfiguration) + + "ControlMeasuresHdfsStorerJsonFile" should "store json file to HDFS" in { + + val outputPath = new Path("/tmp/json-hdfs-storing-test") + fs.delete(outputPath, false) + + new ControlMeasuresHdfsStorerJsonFile(new Configuration(), outputPath).store(inputControlMeasure) + + val actualContent = HdfsFileUtils.readHdfsFileToString(outputPath) + val expectedContent = FileUtils.readFileToString(expectedFilePath) + + // some output may be prettified while other may not, we do not take this into account. + TestResources.filterWhitespaces(actualContent) shouldBe TestResources.filterWhitespaces(expectedContent) + + fs.delete(outputPath, false) + } + +} diff --git a/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonSpec.scala b/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonSpec.scala new file mode 100644 index 00000000..4386c8cf --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3LoaderJsonSpec.scala @@ -0,0 +1,52 @@ +package za.co.absa.atum.persistence.s3 + +import org.mockito.captor.{ArgCaptor, Captor} +import org.mockito.scalatest.IdiomaticMockito +import org.mockito.{ArgumentMatchers, Mockito} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider +import software.amazon.awssdk.core.ResponseBytes +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{GetObjectRequest, GetObjectResponse} +import za.co.absa.atum.persistence.{S3Location, TestResources} +import za.co.absa.atum.utils.FileUtils + +class ControlMeasuresS3LoaderJsonSpec extends AnyFlatSpec with Matchers with IdiomaticMockito { + + val expectedInputControlMeasure = TestResources.InputInfo.controlMeasure + + "ControlMeasuresS3LoaderJsonFile" should "load measurements from json file from (mocked) S3" in { + + val inputLocation = S3Location(bucketName = "bucket1", "path/to/json.info", region = Region.EU_WEST_2) + val mockedS3Client = mock[S3Client] + val mockedRequest: ResponseBytes[GetObjectResponse] = mock[ResponseBytes[GetObjectResponse]] + + implicit val credentialsProvider = DefaultCredentialsProvider.create() + val loader = new ControlMeasuresS3LoaderJsonFile(inputLocation) { + override def getS3Client: S3Client = mockedS3Client + } + + // This file is mocked to be read from in S3 + val inputFilePath: String = TestResources.InputInfo.localPath + val mockedS3Data = FileUtils.readFileToString(inputFilePath) + + // mock S3 response + Mockito.when(mockedS3Client.getObjectAsBytes(ArgumentMatchers.any[GetObjectRequest]())).thenReturn(mockedRequest) + Mockito.when(mockedRequest.asUtf8String()).thenReturn(mockedS3Data) + val loadedControlMeasure = loader.load() + + // verify request content + val getRequestCaptor: Captor[GetObjectRequest] = ArgCaptor[GetObjectRequest] + Mockito.verify(mockedS3Client).getObjectAsBytes(getRequestCaptor.capture) + val capturedGetRequest = getRequestCaptor.value + + capturedGetRequest.bucket shouldBe "bucket1" + capturedGetRequest.key shouldBe "path/to/json.info" + + // verify returned value + loadedControlMeasure shouldBe expectedInputControlMeasure + } + +} diff --git a/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonSpec.scala b/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonSpec.scala new file mode 100644 index 00000000..ba5ee1cc --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/persistence/s3/ControlMeasuresS3StorerJsonSpec.scala @@ -0,0 +1,61 @@ +package za.co.absa.atum.persistence.hdfs + +import org.mockito.captor.{ArgCaptor, Captor} +import org.mockito.scalatest.IdiomaticMockito +import org.mockito.{ArgumentMatchers, Mockito} +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider +import software.amazon.awssdk.core.sync.RequestBody +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.s3.S3Client +import software.amazon.awssdk.services.s3.model.{PutObjectRequest, PutObjectResponse, ServerSideEncryption} +import za.co.absa.atum.persistence.s3.ControlMeasuresS3StorerJsonFile +import za.co.absa.atum.persistence.{S3KmsSettings, S3Location, TestResources} +import za.co.absa.atum.utils.FileUtils + +import scala.io.Source + +class ControlMeasuresS3StorerJsonSpec extends AnyFlatSpec with Matchers with IdiomaticMockito { + + val inputControlMeasure = TestResources.InputInfo.controlMeasure + + "ControlMeasuresS3StorerJsonFile" should "store measurements to json file to S3" in { + + val outputLocation = S3Location(bucketName = "bucket1", "path/to/json.info", region = Region.EU_WEST_2) + val kmsSettigns = S3KmsSettings("testingKeyId123") + val mockedS3Client = mock[S3Client] + + implicit val credentialsProvider = DefaultCredentialsProvider.create() + + val storer = new ControlMeasuresS3StorerJsonFile(outputLocation, kmsSettigns) { + override def getS3Client: S3Client = mockedS3Client + } + + // mock S3 response + Mockito.when(mockedS3Client.putObject(ArgumentMatchers.any[PutObjectRequest], ArgumentMatchers.any[RequestBody])) + .thenReturn(mock[PutObjectResponse]) // anything non-throwing + val loadedControlMeasure = storer.store(inputControlMeasure) + + // verify request content + val putRequestCaptor: Captor[PutObjectRequest] = ArgCaptor[PutObjectRequest] + val requestBodyCaptor: Captor[RequestBody] = ArgCaptor[RequestBody] + + Mockito.verify(mockedS3Client).putObject(putRequestCaptor.capture, requestBodyCaptor.capture) + val (capturedPutRequest, capturedRequestBody) = (putRequestCaptor.value, requestBodyCaptor.value) + + capturedPutRequest.bucket shouldBe "bucket1" + capturedPutRequest.key shouldBe "path/to/json.info" + capturedPutRequest.ssekmsKeyId shouldBe "testingKeyId123" + capturedPutRequest.serverSideEncryption() shouldBe ServerSideEncryption.AWS_KMS + + // This expected request body content should be the same as content of this file (conforms to `inputControlMeasure`) + val sameContentFile: String = TestResources.InputInfo.localPath + val expectedContent = FileUtils.readFileToString(sameContentFile) + + val requestDataContent = Source.fromInputStream(capturedRequestBody.contentStreamProvider().newStream()).mkString + TestResources.filterWhitespaces(requestDataContent) shouldBe TestResources.filterWhitespaces(expectedContent) + + } + +} diff --git a/atum/src/test/scala/za/co/absa/atum/utils/ExecutionPlanUtilsSuite.scala b/atum/src/test/scala/za/co/absa/atum/utils/ExecutionPlanUtilsSuite.scala new file mode 100644 index 00000000..28644135 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/utils/ExecutionPlanUtilsSuite.scala @@ -0,0 +1,53 @@ +package za.co.absa.atum.utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand +import org.mockito.Mockito +import org.mockito.scalatest.IdiomaticMockito +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class ExecutionPlanUtilsSuite extends AnyFlatSpec with Matchers with IdiomaticMockito { + + val hadoopConf = new Configuration + + implicit class SimplePath(path: Path) { + // disregarding hdfs nameserver prefix or local FS fallback (file://) + def simplePath: String = path.toUri.getPath + } + + "inferOutputInfoFileName" should "derive output file name for HDFS from SaveIntoDataSourceCommand" in { + val qe = mock[QueryExecution] + Mockito.when(qe.analyzed).thenReturn( + SaveIntoDataSourceCommand(null, null, options = Map(("path", "/tmp")), null) + ) + + ExecutionPlanUtils.inferOutputFileName(qe, hadoopConf).get.simplePath shouldBe "/tmp" + } + + "inferOutputInfoFileName" should "derive output info file name for HDFS from SaveIntoDataSourceCommand" in { + val qe = mock[QueryExecution] + val myInfoName = "myInfo" + Mockito.when(qe.analyzed).thenReturn( + SaveIntoDataSourceCommand(null, null, options = Map(("path", "/tmp/here")), null) + ) + + ExecutionPlanUtils.inferOutputInfoFileName(qe, myInfoName).get.simplePath shouldBe "/tmp/here/myInfo" + } + + "inferOutputInfoFileNameOnS3" should "derive output info file name for S3 from SaveIntoDataSourceCommand" in { + val qe = mock[QueryExecution] + val myInfoName = "myInfo" + Mockito.when(qe.analyzed).thenReturn( + // trailing slash should get taken care of + SaveIntoDataSourceCommand(null, null, options = Map(("path", "/tmp/here2/")), null) + ) + + ExecutionPlanUtils.inferOutputInfoFileNameOnS3(qe, myInfoName).get shouldBe "/tmp/here2/myInfo" + } + + + +} diff --git a/atum/src/test/scala/za/co/absa/atum/utils/FileUtilsSpec.scala b/atum/src/test/scala/za/co/absa/atum/utils/FileUtilsSpec.scala new file mode 100644 index 00000000..6e723046 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/utils/FileUtilsSpec.scala @@ -0,0 +1,17 @@ +package za.co.absa.atum.utils + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +class FileUtilsSpec extends AnyFlatSpec with Matchers { + + "PathJoin" should "join paths correctly" in { + + import za.co.absa.atum.utils.FileUtils.PathJoin + "/path/to" / "file" shouldBe "/path/to/file" + "/path/to/" / "file" shouldBe "/path/to/file" + "/path/to" / "/file" shouldBe "/path/to/file" + "/path/to/" / "/file" shouldBe "/path/to/file" + + } +} diff --git a/atum/src/test/scala/za/co/absa/atum/utils/S3UtilsSpec.scala b/atum/src/test/scala/za/co/absa/atum/utils/S3UtilsSpec.scala new file mode 100644 index 00000000..293e19f4 --- /dev/null +++ b/atum/src/test/scala/za/co/absa/atum/utils/S3UtilsSpec.scala @@ -0,0 +1,47 @@ +package za.co.absa.atum.utils + +import org.scalatest.flatspec.AnyFlatSpec +import software.amazon.awssdk.regions.Region +import za.co.absa.atum.persistence.S3Location +import S3Utils.StringS3LocationExt +import org.scalatest.matchers.should.Matchers + +class S3UtilsSpec extends AnyFlatSpec with Matchers { + + val region1 = Region.EU_WEST_1 + + val validPathsWithExpectedLocations = Seq( + // (path, expected parsed value) + ("s3://mybucket-123/path/to/file.ext", S3Location("mybucket-123", "path/to/file.ext", region1)), + ("s3n://mybucket-123/path/to/ends/with/slash/", S3Location("mybucket-123", "path/to/ends/with/slash/", region1)), + ("s3a://mybucket-123.asdf.cz/path-to-$_file!@#$.ext", S3Location("mybucket-123.asdf.cz", "path-to-$_file!@#$.ext", region1)) + ) + + val invalidPaths = Seq( + "s3x://mybucket-123/path/to/file/on/invalid/prefix", + "s3://bb/some/path/but/bucketname/too/short" + ) + + "S3Utils.StringS3LocationExt" should "parse S3 path from String using toS3Location" in { + validPathsWithExpectedLocations.foreach { case (path, expectedLocation) => + path.toS3Location(region1) shouldBe expectedLocation + } + } + + it should "fail parsing invalid S3 path from String using toS3Location" in { + invalidPaths.foreach { path => + assertThrows[IllegalArgumentException] { + path.toS3Location(region1) + } + } + } + + it should "check path using isValidS3Path" in { + validPathsWithExpectedLocations.map(_._1).foreach { path => + path.isValidS3Path shouldBe true + } + + invalidPaths.foreach(_.isValidS3Path shouldBe false) + } + +} diff --git a/examples/pom.xml b/examples/pom.xml index fc19dbaf..ffd960f0 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -24,7 +24,7 @@ za.co.absa atum-parent - 0.2.7-SNAPSHOT + 0.3.0-SNAPSHOT diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala b/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala index 5c511a62..c6d639e9 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala +++ b/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements1.scala @@ -42,5 +42,7 @@ object SampleMeasurements1 { .setCheckpoint("checkpoint1") .write.mode(SaveMode.Overwrite) .parquet("data/output/stage1_job_results") + + spark.disableControlMeasuresTracking() } } diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala b/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala index 71531eac..5a8ec454 100644 --- a/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala +++ b/examples/src/main/scala/za/co/absa/atum/examples/SampleMeasurements2.scala @@ -47,5 +47,7 @@ object SampleMeasurements2 { .setCheckpoint("checkpoint2") .write.mode(SaveMode.Overwrite) .parquet("data/output/stage2_job_results") + + spark.disableControlMeasuresTracking() } } diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements1.scala b/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements1.scala new file mode 100644 index 00000000..ea51455f --- /dev/null +++ b/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements1.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.examples + +import org.apache.spark.sql.{SaveMode, SparkSession} +import software.amazon.awssdk.regions.Region +import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.persistence.S3Location +import za.co.absa.atum.utils.S3Utils + +object SampleS3Measurements1 { + def main(args: Array[String]) { + val sparkBuilder = SparkSession.builder().appName("Sample S3 Measurements 1 Job") + val spark = sparkBuilder + // .master("local") + .getOrCreate() + + import spark.implicits._ + + // This sample example relies on local credentials profile named "saml" with access to the s3 location defined below + implicit val samlCredentialsProvider = S3Utils.getLocalProfileCredentialsProvider("saml") + + // Initializing library to hook up to Apache Spark + spark.enableControlMeasuresTrackingForS3( + sourceS3Location = Some(S3Location("my-bucket", "atum/input/wikidata.csv.info", Region.EU_WEST_1)), + destinationS3Config = None + ).setControlMeasuresWorkflow("Job 1 S3 ") + + // A business logic of a spark job ... + + spark.read + .option("header", "true") + .option("inferSchema", "true") + .csv("data/input/wikidata.csv") + .as("source") + .filter($"total_response_size" > 1000) + .setCheckpoint("checkpoint1") + .write.mode(SaveMode.Overwrite) + .parquet("data/output_s3/stage1_job_results") + + spark.disableControlMeasuresTracking() + } +} diff --git a/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements2.scala b/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements2.scala new file mode 100644 index 00000000..947119f9 --- /dev/null +++ b/examples/src/main/scala/za/co/absa/atum/examples/SampleS3Measurements2.scala @@ -0,0 +1,68 @@ +/* + * Copyright 2018-2019 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.examples + +import org.apache.spark.sql.{SaveMode, SparkSession} +import software.amazon.awssdk.regions.Region +import za.co.absa.atum.AtumImplicits._ +import za.co.absa.atum.core.Atum +import za.co.absa.atum.persistence.{S3KmsSettings, S3Location} +import za.co.absa.atum.utils.S3Utils + +object SampleS3Measurements2 { + def main(args: Array[String]) { + + // This example is intended to run AFTER SampleMeasurements1, otherwise it will fail on input file absence + + val sparkBuilder = SparkSession.builder().appName("Sample Measurements 2 Job") + //val spark = sparkBuilder.master("local").getOrCreate() + val spark = sparkBuilder.getOrCreate() + import spark.implicits._ + + // This sample example relies on local credentials profile named "saml" with access to the s3 location defined below + // AND by having explicitly defined KMS Key ID + implicit val samlCredentialsProvider = S3Utils.getLocalProfileCredentialsProvider("saml") + val kmsKeyId = System.getenv("TOOLING_KMS_KEY_ID") // load from an environment property in order not to disclose it here + Atum.log.info(s"kmsKeyId from env loaded = ${kmsKeyId.take(10)}...") + + // Initializing library to hook up to Apache Spark + // No need to specify datasetName and datasetVersion as it is stage 2 and it will be determined automatically + spark.enableControlMeasuresTrackingForS3( + sourceS3Location = None, + destinationS3Config = Some( + S3Location("my-bucket", "atum/output/wikidata.csv.info", Region.EU_WEST_1), + S3KmsSettings(kmsKeyId) + ) + ) .setControlMeasuresWorkflow("Job 2") + + val sourceDS = spark.read + .parquet("data/output_s3/stage1_job_results") + + // A business logic of a spark job ... + + // An example - a column rename + // If the renamed column is one of control measurement columns, the rename need to be registered in Control Framework + sourceDS.as("target") + .withColumnRenamed("total_response_size", "trs") // Renaming the column + .registerColumnRename("total_response_size","trs") // Registering the rename, from now on the new name for the column is 'trs' + .filter($"trs" > 1000) + .setCheckpoint("checkpoint2") + .write.mode(SaveMode.Overwrite) + .parquet("data/output_s3/stage2_job_results") + + spark.disableControlMeasuresTracking() + } +} diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala b/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala index 2b2e2a53..37a3eab0 100644 --- a/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala +++ b/examples/src/main/scala/za/co/absa/atum/utils/SparkJobRunnerMethods.scala @@ -15,13 +15,13 @@ package za.co.absa.atum.utils -import org.scalatest.FunSuiteLike +import org.scalatest.funsuite.AnyFunSuiteLike import scala.reflect.ClassTag import scala.reflect.runtime.universe trait SparkJobRunnerMethods { - this: FunSuiteLike => + this: AnyFunSuiteLike => private def runSparkJob[T](implicit ct: ClassTag[T]): Unit = { type MainClass = {def main(args: Array[String]): Unit} diff --git a/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala b/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala index 381a6374..2fb00c39 100644 --- a/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala +++ b/examples/src/main/scala/za/co/absa/atum/utils/SparkLocalMaster.scala @@ -17,4 +17,8 @@ package za.co.absa.atum.utils trait SparkLocalMaster { System.getProperties.setProperty("spark.master", "local[*]") + + // in order to runSampleMeasuremts as tests, otherwise + // java.lang.IllegalArgumentException: System memory 259522560 must be at least 471859200... is thrown + System.getProperties.setProperty("spark.testing.memory", (1024*1024*1024).toString) // 1g } diff --git a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements2Runner.scala b/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala similarity index 70% rename from examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements2Runner.scala rename to examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala index 9fe0b80a..7d803e69 100644 --- a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements2Runner.scala +++ b/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsHdfsRunnerSpec.scala @@ -15,11 +15,14 @@ package za.co.absa.atum.examples -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite import za.co.absa.atum.utils._ -class SampleMeasurements2Runner extends FunSuite +class SampleMeasurementsHdfsRunnerSpec extends AnyFunSuite with SparkJobRunnerMethods with SparkLocalMaster { - runSparkJobAsTest[SampleMeasurements2.type] + + // SampleMeasurement2 depends on SampleMeasurements1's output, so they must be run in this order + runSparkJobAsTest[SampleMeasurements1.type] + runSparkJobAsTest[SampleMeasurements2.type] } diff --git a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements1Runner.scala b/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerSpec.scala similarity index 78% rename from examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements1Runner.scala rename to examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerSpec.scala index f30d71ac..e43de762 100644 --- a/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurements1Runner.scala +++ b/examples/src/test/scala/za/co/absa/atum/examples/SampleMeasurementsS3RunnerSpec.scala @@ -15,11 +15,13 @@ package za.co.absa.atum.examples -import org.scalatest.FunSuite +import org.scalatest.funsuite.AnyFunSuite import za.co.absa.atum.utils._ -class SampleMeasurements1Runner extends FunSuite +class SampleMeasurementsS3RunnerSpec extends AnyFunSuite with SparkJobRunnerMethods with SparkLocalMaster { - runSparkJobAsTest[SampleMeasurements1.type] + + runSparkJobAsTest[SampleS3Measurements1.type] + runSparkJobAsTest[SampleS3Measurements2.type] } diff --git a/pom.xml b/pom.xml index b9b25c22..34cabef9 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ za.co.absa atum-parent - 0.2.7-SNAPSHOT + 0.3.0-SNAPSHOT pom @@ -97,16 +97,30 @@ http://github.com/AbsaOSS/atum/tree/master - 3.2.11 + 3.5.3 2.11 2.11.8 1.0 - 2.2.4 + 3.2.2 1.7.25 - 2.2.1 + 2.4.5 2.4.16 + 2.13.65 + 1.15.0 + + + + software.amazon.awssdk + bom + ${aws.java.sdk.version} + pom + import + + + + @@ -133,6 +147,10 @@ json4s-ext_${scala.compat.version} ${json4s.version} + + software.amazon.awssdk + s3 +