From 5fdcc76aedee738d71fb271cb1584b1e4f1a96a3 Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Fri, 15 May 2020 12:25:42 +0530 Subject: [PATCH 1/8] BugFix - Incorrect calculation of Error % for ColumnBasedChecks --- .../target/data_validator/JsonEncoders.scala | 10 +++ .../data_validator/ValidatorEvent.scala | 10 +++ .../validator/ColumnBased.scala | 87 +++++++++++++------ .../data_validator/ValidatorBaseSpec.scala | 6 +- .../validator/ColumnBasedSpec.scala | 44 ++++++++-- 5 files changed, 125 insertions(+), 32 deletions(-) diff --git a/src/main/scala/com/target/data_validator/JsonEncoders.scala b/src/main/scala/com/target/data_validator/JsonEncoders.scala index e254f89..b478fc0 100644 --- a/src/main/scala/com/target/data_validator/JsonEncoders.scala +++ b/src/main/scala/com/target/data_validator/JsonEncoders.scala @@ -5,6 +5,8 @@ import com.typesafe.scalalogging.LazyLogging import io.circe._ import io.circe.syntax._ +import scala.collection.mutable.ListBuffer + object JsonEncoders extends LazyLogging { // Used by ValidatorQuickCheckError to make sure Json types are right. @@ -47,6 +49,14 @@ object JsonEncoders extends LazyLogging { ("count", Json.fromLong(vce.count)), ("errorCount", Json.fromLong(vce.errorCount)) ) + case cbvce: ColumnBasedValidatorCheckEvent => + val fields = new ListBuffer[Tuple2[String, Json]] + fields.append(("type", Json.fromString("columnBasedCheckEvent"))) + fields.append(("failed", Json.fromBoolean(cbvce.failed))) + fields.append(("message", Json.fromString(cbvce.msg))) + cbvce.data.foreach(x => fields.append((x._1, Json.fromString(x._2)))) + Json.fromFields(fields) + case qce: ValidatorQuickCheckError => Json.obj( ("type", Json.fromString("quickCheckError")), ("failed", Json.fromBoolean(qce.failed)), diff --git a/src/main/scala/com/target/data_validator/ValidatorEvent.scala b/src/main/scala/com/target/data_validator/ValidatorEvent.scala index 8978b01..c825d1e 100644 --- a/src/main/scala/com/target/data_validator/ValidatorEvent.scala +++ b/src/main/scala/com/target/data_validator/ValidatorEvent.scala @@ -36,6 +36,16 @@ case class ValidatorCheckEvent(failure: Boolean, label: String, count: Long, err } } +case class ColumnBasedValidatorCheckEvent(failure: Boolean, + data: List[Tuple2[String, String]], + msg: String) extends ValidatorEvent { + override def failed: Boolean = failure + + override def toHTML: Text.all.Tag = { + div(cls:="checkEvent")(failedHTML, s" - ${msg}") + } +} + class ValidatorTimer(val label: String) extends ValidatorEvent { var duration = 0L diff --git a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala index fcc8d95..8e21aa6 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala @@ -1,6 +1,6 @@ package com.target.data_validator.validator -import com.target.data_validator.{ValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution} +import com.target.data_validator.{ColumnBasedValidatorCheckEvent, ValidatorCounter, ValidatorError, VarSubstitution} import com.target.data_validator.JsonEncoders.eventEncoder import io.circe.Json import io.circe.syntax._ @@ -10,11 +10,20 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.Max import org.apache.spark.sql.types._ +import scala.collection.mutable.ListBuffer +import scala.math.abs + abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck { override def select(schema: StructType, dict: VarSubstitution): Expression = condTest // ColumnBased checks don't have per row error details. def hasQuickErrorDetails: Boolean = false + + // calculates and returns the pct error as a string + def calculatePctError(expected: Double, actual: Double, formatStr: String = "%4.2f%%"): String = { + val pct = abs(((expected - actual) * 100.0) / expected) + formatStr.format(pct) + } } case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0) { @@ -36,8 +45,11 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0 override def quickCheck(row: Row, count: Long, idx: Int): Boolean = { failed = count < minNumRows + val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00" addEvent(ValidatorCounter("rowCount", count)) - addEvent(ValidatorCheckEvent(failed, s"MinNumRowCheck $minNumRows ", count, 1)) + val msg = s"MinNumRowsCheck Expected: ${minNumRows} Actual: ${count} Error %: ${pctError}" + val data = List(("Expected", minNumRows.toString), ("Actual", count.toString), ("Error Pct", pctError)) + addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg)) failed } @@ -62,41 +74,66 @@ case class ColumnMaxCheck(column: String, value: Json) override def configCheck(df: DataFrame): Boolean = checkTypes(df, column, value) + // scalastyle:off override def quickCheck(row: Row, count: Long, idx: Int): Boolean = { val dataType = row.schema(idx).dataType val rMax = row(idx) logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}") - val num = value.asNumber + + var pctError = "0.00" + var errorMsg = "" + val data = new ListBuffer[Tuple2[String, String]] + failed = dataType match { - case StringType => value.asString.exists(_ != row.getString(idx)) - case ByteType => num.map(_.toByte).exists(_.get != row.getByte(idx)) - case ShortType => num.map(_.toShort).exists(_.get != row.getShort(idx)) - case IntegerType => - val intNum = value.asNumber.map(_.toInt.get).getOrElse(-1) - val rowInt = row.getInt(idx) - logger.debug(s"intNum[${intNum.getClass.getCanonicalName}]: $intNum " + - s"rowInt[${rowInt.getClass.getCanonicalName}]: $rowInt") - num.map(_.toInt).exists(_.get != row.getInt(idx)) - case LongType => num.map(_.toLong).exists(_.get != row.getLong(idx)) - case FloatType => num.forall(_.toDouble != row.getFloat(idx)) - case DoubleType => num.forall(_.toDouble != row.getDouble(idx)) - case ut => - logger.error(s"quickCheck for type: $ut, Row: $row not Implemented! Please file this as a bug.") + case StringType => { + val expected = value.asString.getOrElse("") + val actual = row.getString(idx) + data.appendAll(List(("Expected", expected), ("Actual", actual))) + errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $expected, Actual: $actual" + expected != actual + } + case d:NumericType => { + val num = value.asNumber.get + var expected = 0.0 + var actual = 0.0 + d match { + case ByteType => + expected = num.toByte.getOrElse[Byte](-1) + actual = row.getByte(idx) + case ShortType => + expected = num.toShort.getOrElse[Short](-1) + actual = row.getShort(idx) + case IntegerType => + expected = num.toInt.getOrElse[Int](-1) + actual = row.getInt(idx) + case LongType => + expected = num.toLong.getOrElse[Long](-1) + actual = row.getLong(idx) + case FloatType => + expected = num.toDouble + actual = row.getFloat(idx) + case DoubleType => + expected = num.toDouble + actual = row.getDouble(idx) + } + pctError = if(expected != actual) calculatePctError(expected, actual) else "0.00" + data.appendAll(List(("Expected", num.toString), ("Actual", rMax.toString), ("Error Pct", pctError))) + errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Error %: ${pctError}" + expected != actual + } + case ut => { + logger.error(s"ColumnMaxCheck for type: $ut, Row: $row not implemented! Please file this as a bug.") + errorMsg = s"ColumnMaxCheck is not supported for data type ${dataType}" true // Fail check! + } } logger.debug(s"MaxValue compared Row: $row with value: $value failed: $failed") if (failed) { - addEvent( - ValidatorCheckEvent( - failed, - s"columnMaxCheck column[$dataType]: $column value: $value doesn't equal $rMax", - count, - 1 - ) - ) + addEvent(ColumnBasedValidatorCheckEvent(failed, data.toList, errorMsg)) } failed } + // scalastyle:on override def toJson: Json = Json.obj( ("type", Json.fromString("columnMaxCheck")), diff --git a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala index 5513e78..1847506 100644 --- a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala +++ b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala @@ -182,10 +182,14 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { it("quickCheck() should fail when rowCount < minNumRows") { val dict = new VarSubstitution val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore - val config = mkConfig(df, List(MinNumRows(10))) //scalastyle:ignore + val minNumRowsCheck = MinNumRows(10) + val config = mkConfig(df, List(minNumRowsCheck)) //scalastyle:ignore assert(config.quickChecks(spark, dict)) assert(config.failed) assert(config.tables.head.failed) + assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, + List(("Expected", "10"), ("Actual", "2"), ("Error Pct", "80.00%")), + "MinNumRowsCheck Expected: 10 Actual: 2 Error %: 80.00%")) } it("quickCheck() should succeed when rowCount > minNumRows") { diff --git a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala index a8e6867..6b0b4e2 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala @@ -17,14 +17,15 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { StructField("key", StringType), StructField("data", StringType), StructField("number", IntegerType), - StructField("byte", ByteType) + StructField("byte", ByteType), + StructField("double", DoubleType) ) ) val sampleData = List( - Row("one", "2018/10/01", 3, 10.toByte), - Row("two", "2018/10/02", 2, 20.toByte), - Row("three", "2018/10/31", 1, 30.toByte) + Row("one", "2018/10/01", 3, 10.toByte, 2.0), + Row("two", "2018/10/02", 2, 20.toByte, 3.5), + Row("three", "2018/10/31", 1, 30.toByte, 1.7) ) def mkValidatorConfig(checks: List[ValidatorBase]): ValidatorConfig = @@ -63,10 +64,15 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { it("should fail when value doesn't match max column value") { val dict = new VarSubstitution - val sut = mkValidatorConfig(List(ColumnMaxCheck("data", Json.fromString("2018/11/01")))) + val columnMaxCheck = ColumnMaxCheck("data", Json.fromString("2018/11/01")) + val sut = mkValidatorConfig(List(columnMaxCheck)) assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) + assert(columnMaxCheck.getEvents contains + ColumnBasedValidatorCheckEvent(true, + List(("Expected", "2018/11/01"), ("Actual", "2018/10/31")), + "ColumnMaxCheck data[StringType]: Expected: 2018/11/01, Actual: 2018/10/31")) } it("should not fail with numeric column matches max value") { @@ -79,10 +85,36 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { it("should fail when numeric column doesn't match max value") { val dict = new VarSubstitution - val sut = mkValidatorConfig(List(ColumnMaxCheck("number", Json.fromInt(100)))) // scalastyle:ignore + val columnMaxCheck = ColumnMaxCheck("number", Json.fromInt(100)) + val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) + assert(columnMaxCheck.getEvents contains + ColumnBasedValidatorCheckEvent(true, + List(("Expected", "100"), ("Actual", "3"), ("Error Pct", "97.00%")), + "ColumnMaxCheck number[IntegerType]: Expected: 100, Actual: 3. Error %: 97.00%")) + } + + it("should not fail when double column matches max value") { + val dict = new VarSubstitution + val sut = mkValidatorConfig(List(ColumnMaxCheck("double", Json.fromDouble(3.5).get))) + assert(!sut.configCheck(spark, dict)) + assert(!sut.quickChecks(spark, dict)) + assert(!sut.failed) + } + + it("should fail when double column doesn't match max value") { + val dict = new VarSubstitution + val columnMaxCheck = ColumnMaxCheck("double", Json.fromDouble(5.0).get) + val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore + assert(!sut.configCheck(spark, dict)) + assert(sut.quickChecks(spark, dict)) + assert(sut.failed) + assert(columnMaxCheck.getEvents contains + ColumnBasedValidatorCheckEvent(true, + List(("Expected", "5.0"), ("Actual", "3.5"), ("Error Pct", "30.00%")), + "ColumnMaxCheck double[DoubleType]: Expected: 5.0, Actual: 3.5. Error %: 30.00%")) } it("should fail when byte column and value overflows") { From 28ede343f4a406cc42b7d81d0da1e2c9551f0ea3 Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Mon, 18 May 2020 20:43:39 +0530 Subject: [PATCH 2/8] Address Review Comments --- .../target/data_validator/JsonEncoders.scala | 16 +-- .../data_validator/ValidatorEvent.scala | 2 +- .../validator/ColumnBased.scala | 109 ++++++++++-------- .../data_validator/ValidatorBaseSpec.scala | 9 +- .../validator/ColumnBasedSpec.scala | 21 +++- 5 files changed, 90 insertions(+), 67 deletions(-) diff --git a/src/main/scala/com/target/data_validator/JsonEncoders.scala b/src/main/scala/com/target/data_validator/JsonEncoders.scala index b478fc0..838e935 100644 --- a/src/main/scala/com/target/data_validator/JsonEncoders.scala +++ b/src/main/scala/com/target/data_validator/JsonEncoders.scala @@ -5,8 +5,6 @@ import com.typesafe.scalalogging.LazyLogging import io.circe._ import io.circe.syntax._ -import scala.collection.mutable.ListBuffer - object JsonEncoders extends LazyLogging { // Used by ValidatorQuickCheckError to make sure Json types are right. @@ -49,14 +47,12 @@ object JsonEncoders extends LazyLogging { ("count", Json.fromLong(vce.count)), ("errorCount", Json.fromLong(vce.errorCount)) ) - case cbvce: ColumnBasedValidatorCheckEvent => - val fields = new ListBuffer[Tuple2[String, Json]] - fields.append(("type", Json.fromString("columnBasedCheckEvent"))) - fields.append(("failed", Json.fromBoolean(cbvce.failed))) - fields.append(("message", Json.fromString(cbvce.msg))) - cbvce.data.foreach(x => fields.append((x._1, Json.fromString(x._2)))) - Json.fromFields(fields) - + case cbvce: ColumnBasedValidatorCheckEvent => Json.obj( + ("type", Json.fromString("columnBasedCheckEvent")), + ("failed", Json.fromBoolean(cbvce.failed)), + ("message", Json.fromString(cbvce.msg)), + ("data", Json.fromFields(cbvce.data.map(x => (x._1, Json.fromString(x._2))))) + ) case qce: ValidatorQuickCheckError => Json.obj( ("type", Json.fromString("quickCheckError")), ("failed", Json.fromBoolean(qce.failed)), diff --git a/src/main/scala/com/target/data_validator/ValidatorEvent.scala b/src/main/scala/com/target/data_validator/ValidatorEvent.scala index c825d1e..7437168 100644 --- a/src/main/scala/com/target/data_validator/ValidatorEvent.scala +++ b/src/main/scala/com/target/data_validator/ValidatorEvent.scala @@ -37,7 +37,7 @@ case class ValidatorCheckEvent(failure: Boolean, label: String, count: Long, err } case class ColumnBasedValidatorCheckEvent(failure: Boolean, - data: List[Tuple2[String, String]], + data: Map[String, String], msg: String) extends ValidatorEvent { override def failed: Boolean = failure diff --git a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala index 8e21aa6..7f764be 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.Max import org.apache.spark.sql.types._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.LinkedHashMap import scala.math.abs abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck { @@ -21,8 +21,20 @@ abstract class ColumnBased(column: String, condTest: Expression) extends CheapCh // calculates and returns the pct error as a string def calculatePctError(expected: Double, actual: Double, formatStr: String = "%4.2f%%"): String = { - val pct = abs(((expected - actual) * 100.0) / expected) - formatStr.format(pct) + var pct_error_str = "" + + if (expected == actual) { + pct_error_str = formatStr.format(0.00) // if expected == actual, error % should be 0, even if expected is 0 + } + else if (expected == 0.0) { + pct_error_str = "undefined" + } + else { + val pct = abs(((expected - actual) * 100.0) / expected) + pct_error_str = formatStr.format(pct) + } + + pct_error_str } } @@ -45,11 +57,11 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0 override def quickCheck(row: Row, count: Long, idx: Int): Boolean = { failed = count < minNumRows - val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00" + val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%" addEvent(ValidatorCounter("rowCount", count)) val msg = s"MinNumRowsCheck Expected: ${minNumRows} Actual: ${count} Error %: ${pctError}" - val data = List(("Expected", minNumRows.toString), ("Actual", count.toString), ("Error Pct", pctError)) - addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg)) + val data = LinkedHashMap("expected" -> minNumRows.toString, "actual" -> count.toString, "error_percent" -> pctError) + addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, msg)) failed } @@ -80,56 +92,51 @@ case class ColumnMaxCheck(column: String, value: Json) val rMax = row(idx) logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}") - var pctError = "0.00" var errorMsg = "" - val data = new ListBuffer[Tuple2[String, String]] - - failed = dataType match { - case StringType => { - val expected = value.asString.getOrElse("") - val actual = row.getString(idx) - data.appendAll(List(("Expected", expected), ("Actual", actual))) - errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $expected, Actual: $actual" - expected != actual - } - case d:NumericType => { - val num = value.asNumber.get - var expected = 0.0 - var actual = 0.0 - d match { - case ByteType => - expected = num.toByte.getOrElse[Byte](-1) - actual = row.getByte(idx) - case ShortType => - expected = num.toShort.getOrElse[Short](-1) - actual = row.getShort(idx) - case IntegerType => - expected = num.toInt.getOrElse[Int](-1) - actual = row.getInt(idx) - case LongType => - expected = num.toLong.getOrElse[Long](-1) - actual = row.getLong(idx) - case FloatType => - expected = num.toDouble - actual = row.getFloat(idx) - case DoubleType => - expected = num.toDouble - actual = row.getDouble(idx) - } - pctError = if(expected != actual) calculatePctError(expected, actual) else "0.00" - data.appendAll(List(("Expected", num.toString), ("Actual", rMax.toString), ("Error Pct", pctError))) - errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Error %: ${pctError}" - expected != actual - } - case ut => { - logger.error(s"ColumnMaxCheck for type: $ut, Row: $row not implemented! Please file this as a bug.") - errorMsg = s"ColumnMaxCheck is not supported for data type ${dataType}" - true // Fail check! + val data = new LinkedHashMap[String, String] + + val resultForString = () => { + val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx)) + + failed = expected != actual + data += ("expected" -> expected, "actual" -> actual) + errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected, Actual: $actual" + } + + val resultForNumeric = () => { + val num = value.asNumber.get + var cmp_params = (0.0, 0.0) // (expected, actual) + + dataType match { + case ByteType => cmp_params = (num.toByte.getOrElse[Byte](-1), row.getByte(idx)) + case ShortType => cmp_params = (num.toShort.getOrElse[Short](-1), row.getShort(idx)) + case IntegerType => cmp_params = (num.toInt.getOrElse[Int](-1), row.getInt(idx)) + case LongType => cmp_params = (num.toLong.getOrElse[Long](-1), row.getLong(idx)) + case FloatType => cmp_params = (num.toDouble, row.getFloat(idx)) + case DoubleType => cmp_params = (num.toDouble, row.getDouble(idx)) } + + failed = cmp_params._1 != cmp_params._2 + val pctError = if(failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%" + data += ("expected" -> num.toString, "actual" -> rMax.toString, "error_percent" -> pctError) + errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Error %: ${pctError}" } + + val resultForOther = () => { + logger.error(s"ColumnMaxCheck for type: $dataType, Row: $row not implemented! Please open a bug report on the data-validator issue tracker.") + failed = true + errorMsg = s"ColumnMaxCheck is not supported for data type $dataType" + } + + dataType match { + case StringType => resultForString() + case _:NumericType => resultForNumeric() + case _ => resultForOther() + } + logger.debug(s"MaxValue compared Row: $row with value: $value failed: $failed") if (failed) { - addEvent(ColumnBasedValidatorCheckEvent(failed, data.toList, errorMsg)) + addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, errorMsg)) } failed } diff --git a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala index 1847506..785357e 100644 --- a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala +++ b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala @@ -10,6 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types._ import org.scalatest._ +import scala.collection.mutable.LinkedHashMap import scala.util.Random class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { @@ -188,18 +189,22 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { assert(config.failed) assert(config.tables.head.failed) assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - List(("Expected", "10"), ("Actual", "2"), ("Error Pct", "80.00%")), + LinkedHashMap("expected" -> "10", "actual" -> "2", "error_percent" -> "80.00%").toMap, "MinNumRowsCheck Expected: 10 Actual: 2 Error %: 80.00%")) } it("quickCheck() should succeed when rowCount > minNumRows") { val dict = new VarSubstitution val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore - val config = mkConfig(df, List(MinNumRows(1))) //scalastyle:ignore + val minNumRowsCheck = MinNumRows(1) + val config = mkConfig(df, List(minNumRowsCheck)) //scalastyle:ignore assert(!config.configCheck(spark, dict)) assert(!config.quickChecks(spark, dict)) assert(!config.failed) assert(!config.tables.exists(_.failed)) + assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(false, + LinkedHashMap("expected" -> "1", "actual" -> "2", "error_percent" -> "0.00%").toMap, + "MinNumRowsCheck Expected: 1 Actual: 2 Error %: 0.00%")) } } diff --git a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala index 6b0b4e2..7c17c38 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala @@ -8,6 +8,8 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.scalatest._ +import scala.collection.mutable.LinkedHashMap + class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { describe("columnMaxCheck") { @@ -71,7 +73,7 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - List(("Expected", "2018/11/01"), ("Actual", "2018/10/31")), + LinkedHashMap("expected" -> "2018/11/01", "actual" -> "2018/10/31").toMap, "ColumnMaxCheck data[StringType]: Expected: 2018/11/01, Actual: 2018/10/31")) } @@ -92,10 +94,23 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - List(("Expected", "100"), ("Actual", "3"), ("Error Pct", "97.00%")), + LinkedHashMap("expected" -> "100", "actual" -> "3", "error_percent" -> "97.00%").toMap, "ColumnMaxCheck number[IntegerType]: Expected: 100, Actual: 3. Error %: 97.00%")) } + it("should fail with undefined error % when numeric column doesn't match max value and expected value is 0") { + val dict = new VarSubstitution + val columnMaxCheck = ColumnMaxCheck("number", Json.fromInt(0)) + val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore + assert(!sut.configCheck(spark, dict)) + assert(sut.quickChecks(spark, dict)) + assert(sut.failed) + assert(columnMaxCheck.getEvents contains + ColumnBasedValidatorCheckEvent(true, + LinkedHashMap("expected" -> "0", "actual" -> "3", "error_percent" -> "undefined").toMap, + "ColumnMaxCheck number[IntegerType]: Expected: 0, Actual: 3. Error %: undefined")) + } + it("should not fail when double column matches max value") { val dict = new VarSubstitution val sut = mkValidatorConfig(List(ColumnMaxCheck("double", Json.fromDouble(3.5).get))) @@ -113,7 +128,7 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - List(("Expected", "5.0"), ("Actual", "3.5"), ("Error Pct", "30.00%")), + LinkedHashMap("expected" -> "5.0", "actual" -> "3.5", "error_percent" -> "30.00%").toMap, "ColumnMaxCheck double[DoubleType]: Expected: 5.0, Actual: 3.5. Error %: 30.00%")) } From 64da1ebb2f51d9c137bb028603b02f1ef751752e Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Thu, 21 May 2020 18:34:36 +0530 Subject: [PATCH 3/8] Minor review comments --- .../data_validator/validator/ColumnBased.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala index 7f764be..06d90be 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala @@ -21,20 +21,17 @@ abstract class ColumnBased(column: String, condTest: Expression) extends CheapCh // calculates and returns the pct error as a string def calculatePctError(expected: Double, actual: Double, formatStr: String = "%4.2f%%"): String = { - var pct_error_str = "" if (expected == actual) { - pct_error_str = formatStr.format(0.00) // if expected == actual, error % should be 0, even if expected is 0 + formatStr.format(0.00) // if expected == actual, error % should be 0, even if expected is 0 } else if (expected == 0.0) { - pct_error_str = "undefined" + "undefined" } else { val pct = abs(((expected - actual) * 100.0) / expected) - pct_error_str = formatStr.format(pct) + formatStr.format(pct) } - - pct_error_str } } @@ -93,9 +90,9 @@ case class ColumnMaxCheck(column: String, value: Json) logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}") var errorMsg = "" - val data = new LinkedHashMap[String, String] + val data = LinkedHashMap.empty[String, String] - val resultForString = () => { + def resultForString(): Unit = { val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx)) failed = expected != actual @@ -103,7 +100,7 @@ case class ColumnMaxCheck(column: String, value: Json) errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected, Actual: $actual" } - val resultForNumeric = () => { + def resultForNumeric(): Unit = { val num = value.asNumber.get var cmp_params = (0.0, 0.0) // (expected, actual) @@ -122,7 +119,7 @@ case class ColumnMaxCheck(column: String, value: Json) errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Error %: ${pctError}" } - val resultForOther = () => { + def resultForOther(): Unit = { logger.error(s"ColumnMaxCheck for type: $dataType, Row: $row not implemented! Please open a bug report on the data-validator issue tracker.") failed = true errorMsg = s"ColumnMaxCheck is not supported for data type $dataType" From dd8738eead719d152b2d5a48bf12b03a953a64ad Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Tue, 26 May 2020 12:48:27 +0530 Subject: [PATCH 4/8] Incorrect error pct fix for columnSumCheck and other minor review comments --- README.md | 2 + .../data_validator/ValidatorEvent.scala | 2 +- .../validator/ColumnBased.scala | 24 ++++--- .../validator/ColumnSumCheck.scala | 71 +++++++++++++++---- .../target/data_validator/TestHelpers.scala | 1 + .../data_validator/ValidatorBaseSpec.scala | 10 +-- .../validator/ColumnBasedSpec.scala | 16 ++--- .../validator/ColumnSumCheckSpec.scala | 47 +++++++++++- 8 files changed, 132 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index 2dc3a0e..aaad15a 100644 --- a/README.md +++ b/README.md @@ -324,6 +324,8 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't | `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. | | `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. | +*Note:** If bounds are non-inclusive, and the actual sum is eequal to one of the bounds, the relative error percentage will be undefined. + ## Example Config ```yaml diff --git a/src/main/scala/com/target/data_validator/ValidatorEvent.scala b/src/main/scala/com/target/data_validator/ValidatorEvent.scala index 7437168..65ec20e 100644 --- a/src/main/scala/com/target/data_validator/ValidatorEvent.scala +++ b/src/main/scala/com/target/data_validator/ValidatorEvent.scala @@ -42,7 +42,7 @@ case class ColumnBasedValidatorCheckEvent(failure: Boolean, override def failed: Boolean = failure override def toHTML: Text.all.Tag = { - div(cls:="checkEvent")(failedHTML, s" - ${msg}") + div(cls:="checkEvent")(failedHTML, s" - $msg") } } diff --git a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala index 06d90be..842ee4a 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.Max import org.apache.spark.sql.types._ -import scala.collection.mutable.LinkedHashMap +import scala.collection.mutable.ListMap import scala.math.abs abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck { @@ -56,8 +56,9 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0 failed = count < minNumRows val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%" addEvent(ValidatorCounter("rowCount", count)) - val msg = s"MinNumRowsCheck Expected: ${minNumRows} Actual: ${count} Error %: ${pctError}" - val data = LinkedHashMap("expected" -> minNumRows.toString, "actual" -> count.toString, "error_percent" -> pctError) + val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError" + val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, + "relative_error" -> pctError) addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, msg)) failed } @@ -83,14 +84,13 @@ case class ColumnMaxCheck(column: String, value: Json) override def configCheck(df: DataFrame): Boolean = checkTypes(df, column, value) - // scalastyle:off override def quickCheck(row: Row, count: Long, idx: Int): Boolean = { val dataType = row.schema(idx).dataType val rMax = row(idx) logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}") var errorMsg = "" - val data = LinkedHashMap.empty[String, String] + val data = ListMap.empty[String, String] def resultForString(): Unit = { val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx)) @@ -114,20 +114,23 @@ case class ColumnMaxCheck(column: String, value: Json) } failed = cmp_params._1 != cmp_params._2 - val pctError = if(failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%" - data += ("expected" -> num.toString, "actual" -> rMax.toString, "error_percent" -> pctError) - errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Error %: ${pctError}" + val pctError = if (failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%" + data += ("expected" -> num.toString, "actual" -> rMax.toString, "relative_error" -> pctError) + errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Relative Error: ${pctError}" } def resultForOther(): Unit = { - logger.error(s"ColumnMaxCheck for type: $dataType, Row: $row not implemented! Please open a bug report on the data-validator issue tracker.") + logger.error( + s"""ColumnMaxCheck for type: $dataType, Row: $row not implemented! + |Please open a bug report on the data-validator issue tracker.""".stripMargin + ) failed = true errorMsg = s"ColumnMaxCheck is not supported for data type $dataType" } dataType match { case StringType => resultForString() - case _:NumericType => resultForNumeric() + case _: NumericType => resultForNumeric() case _ => resultForOther() } @@ -137,7 +140,6 @@ case class ColumnMaxCheck(column: String, value: Json) } failed } - // scalastyle:on override def toJson: Json = Json.obj( ("type", Json.fromString("columnMaxCheck")), diff --git a/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala b/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala index 94340de..88c303e 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala @@ -1,13 +1,16 @@ package com.target.data_validator.validator -import com.target.data_validator.{ValidatorCheckEvent, ValidatorError, VarSubstitution} +import com.target.data_validator.{ColumnBasedValidatorCheckEvent, JsonEncoders, ValidatorError, VarSubstitution} import io.circe._ import io.circe.generic.semiauto._ +import io.circe.syntax._ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.spark.sql.types._ +import scala.collection.mutable.ListMap + case class ColumnSumCheck( column: String, minValue: Option[Json] = None, @@ -52,28 +55,66 @@ case class ColumnSumCheck( override def quickCheck(r: Row, count: Long, idx: Int): Boolean = { + val dataType = r.schema(idx).dataType + val isInclusive = inclusiveBounds.right.get + val lowerBoundValue = lowerBound.right.get + val upperBoundValue = upperBound.right.get + def evaluate(sum: Double): Boolean = { - if (inclusiveBounds.right.get) { sum > upperBound.right.get || sum < lowerBound.right.get} - else { sum >= upperBound.right.get || sum <= lowerBound.right.get} + if (isInclusive) { sum > upperBoundValue || sum < lowerBoundValue} + else { sum >= upperBoundValue || sum <= lowerBoundValue} } - failed = r.schema(idx).dataType match { - case ShortType => evaluate(r.getShort(idx)) - case IntegerType => evaluate(r.getInt(idx)) - case LongType => evaluate(r.getLong(idx)) - case FloatType => evaluate(r.getFloat(idx)) - case DoubleType => evaluate(r.getDouble(idx)) + def getPctError(sum: Double): String = { + if (sum < lowerBoundValue) { + calculatePctError(lowerBoundValue, sum) + } + else if (sum > upperBoundValue) { + calculatePctError(upperBoundValue, sum) + } + else if (!isInclusive && (sum == upperBoundValue || sum == lowerBoundValue)) { + "undefined" + } + else { + "0.00%" + } + } + + def getData(actualSum: Double, pctError: String): ListMap[String, String] = { + val data = (minValue, maxValue) match { + case (Some(x), Some(y)) => + ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString) + case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString) + case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString) + case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.") + } + + data += ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError) + } + + val actualSum: Double = dataType match { + case ByteType => r.getByte(idx) + case ShortType => r.getShort(idx) + case IntegerType => r.getInt(idx) + case LongType => r.getLong(idx) + case FloatType => r.getFloat(idx) + case DoubleType => r.getDouble(idx) case ut => throw new Exception(s"Unsupported type for $name found in schema: $ut") } + failed = evaluate(actualSum) + val pctError = getPctError(actualSum) + val data = getData(actualSum, pctError) + val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil val prettyBounds = if (inclusiveBounds.right.get) { - r.get(idx) + " in " + bounds.mkString("[", " , ", "]") + bounds.mkString("[", " , ", "]") } else { - r.get(idx) + " in " + bounds.mkString("(", " , ", ")") + bounds.mkString("(", " , ", ")") } - val errorValue = if (failed) 1 else 0 - addEvent(ValidatorCheckEvent(failed, s"$name on '$column': $prettyBounds", count, errorValue)) + + val msg = s"$name on $column[$dataType]: Expected Range: $prettyBounds Actual: ${r(idx)} Relative Error: $pctError" + addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, msg)) failed } @@ -118,9 +159,11 @@ case class ColumnSumCheck( } override def toJson: Json = { + import JsonEncoders.eventEncoder val additionalFieldsForReport = Json.fromFields(Set( "type" -> Json.fromString("columnSumCheck"), - "failed" -> Json.fromBoolean(failed) + "failed" -> Json.fromBoolean(failed), + "events" -> getEvents.asJson )) val base = ColumnSumCheck.encoder(this) diff --git a/src/test/scala/com/target/data_validator/TestHelpers.scala b/src/test/scala/com/target/data_validator/TestHelpers.scala index 1277f8f..d43a356 100644 --- a/src/test/scala/com/target/data_validator/TestHelpers.scala +++ b/src/test/scala/com/target/data_validator/TestHelpers.scala @@ -25,6 +25,7 @@ object TestHelpers { case "java.lang.Double" => DoubleType case "java.lang.Boolean" => BooleanType case "java.lang.Long" => LongType + case "java.lang.Byte" => ByteType case _ => throw new IllegalArgumentException(s"Unknown type '${v.getClass.getCanonicalName}'") } diff --git a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala index 785357e..cf92348 100644 --- a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala +++ b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types._ import org.scalatest._ -import scala.collection.mutable.LinkedHashMap +import scala.collection.mutable.ListMap import scala.util.Random class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { @@ -189,8 +189,8 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { assert(config.failed) assert(config.tables.head.failed) assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - LinkedHashMap("expected" -> "10", "actual" -> "2", "error_percent" -> "80.00%").toMap, - "MinNumRowsCheck Expected: 10 Actual: 2 Error %: 80.00%")) + ListMap("expected" -> "10", "actual" -> "2", "relative_error" -> "80.00%").toMap, + "MinNumRowsCheck Expected: 10 Actual: 2 Relative Error: 80.00%")) } it("quickCheck() should succeed when rowCount > minNumRows") { @@ -203,8 +203,8 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { assert(!config.failed) assert(!config.tables.exists(_.failed)) assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(false, - LinkedHashMap("expected" -> "1", "actual" -> "2", "error_percent" -> "0.00%").toMap, - "MinNumRowsCheck Expected: 1 Actual: 2 Error %: 0.00%")) + ListMap("expected" -> "1", "actual" -> "2", "relative_error" -> "0.00%").toMap, + "MinNumRowsCheck Expected: 1 Actual: 2 Relative Error: 0.00%")) } } diff --git a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala index 7c17c38..a032f87 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala @@ -8,7 +8,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.scalatest._ -import scala.collection.mutable.LinkedHashMap +import scala.collection.mutable.ListMap class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { @@ -73,7 +73,7 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - LinkedHashMap("expected" -> "2018/11/01", "actual" -> "2018/10/31").toMap, + ListMap("expected" -> "2018/11/01", "actual" -> "2018/10/31").toMap, "ColumnMaxCheck data[StringType]: Expected: 2018/11/01, Actual: 2018/10/31")) } @@ -94,8 +94,8 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - LinkedHashMap("expected" -> "100", "actual" -> "3", "error_percent" -> "97.00%").toMap, - "ColumnMaxCheck number[IntegerType]: Expected: 100, Actual: 3. Error %: 97.00%")) + ListMap("expected" -> "100", "actual" -> "3", "relative_error" -> "97.00%").toMap, + "ColumnMaxCheck number[IntegerType]: Expected: 100, Actual: 3. Relative Error: 97.00%")) } it("should fail with undefined error % when numeric column doesn't match max value and expected value is 0") { @@ -107,8 +107,8 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - LinkedHashMap("expected" -> "0", "actual" -> "3", "error_percent" -> "undefined").toMap, - "ColumnMaxCheck number[IntegerType]: Expected: 0, Actual: 3. Error %: undefined")) + ListMap("expected" -> "0", "actual" -> "3", "relative_error" -> "undefined").toMap, + "ColumnMaxCheck number[IntegerType]: Expected: 0, Actual: 3. Relative Error: undefined")) } it("should not fail when double column matches max value") { @@ -128,8 +128,8 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(sut.failed) assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - LinkedHashMap("expected" -> "5.0", "actual" -> "3.5", "error_percent" -> "30.00%").toMap, - "ColumnMaxCheck double[DoubleType]: Expected: 5.0, Actual: 3.5. Error %: 30.00%")) + ListMap("expected" -> "5.0", "actual" -> "3.5", "relative_error" -> "30.00%").toMap, + "ColumnMaxCheck double[DoubleType]: Expected: 5.0, Actual: 3.5. Relative Error: 30.00%")) } it("should fail when byte column and value overflows") { diff --git a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala index eb89dc7..9458fec 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala @@ -1,11 +1,13 @@ package com.target.data_validator.validator import com.target.TestingSparkSession -import com.target.data_validator.{ValidatorConfig, ValidatorDataFrame, ValidatorError} +import com.target.data_validator.{ColumnBasedValidatorCheckEvent, ValidatorConfig, ValidatorDataFrame, ValidatorError} import com.target.data_validator.TestHelpers.{mkDf, mkDict, parseYaml} import io.circe._ import org.scalatest.{FunSpec, Matchers} +import scala.collection.mutable.ListMap + class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession { describe("ColumnSumCheck") { @@ -185,6 +187,11 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) + assert(check.getEvents contains + ColumnBasedValidatorCheckEvent(true, + ListMap("lower_bound" -> "6", "inclusive" -> "false", "actual" -> "6", + "relative_error" -> "undefined").toMap, + "columnSumCheck on foo[LongType]: Expected Range: (6 , ) Actual: 6 Relative Error: undefined")) } it("lower bound inclusive success") { @@ -200,7 +207,7 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession assert(!sut.failed) } - it("upper bound success") { + it("upper bound success with short") { val check = ColumnSumCheck("foo", maxValue = Some(Json.fromDouble(10).get)) // scalastyle:ignore magic.number val df = mkDf(spark, "foo" -> List[Short](1, 2, 1)) val sut = ValidatorDataFrame(df, None, None, List(check)) @@ -208,12 +215,25 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession assert(!sut.failed) } + it("upper bound success with byte") { + val check = ColumnSumCheck("foo", maxValue = Some(Json.fromDouble(10).get)) // scalastyle:ignore magic.number + val df = mkDf(spark, "foo" -> List[Byte](1, 2, 1)) + val sut = ValidatorDataFrame(df, None, None, List(check)) + assert(!sut.quickChecks(spark, mkDict())(config)) + assert(!sut.failed) + } + it("upper bound failure") { val check = ColumnSumCheck("foo", maxValue = Some(Json.fromFloat(1).get)) // scalastyle:ignore magic.number val df = mkDf(spark, "foo" -> List(1L, 1L, 1L)) val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) + assert(check.getEvents contains + ColumnBasedValidatorCheckEvent(true, + ListMap("upper_bound" -> "1.0", "inclusive" -> "false", "actual" -> "3", + "relative_error" -> "200.00%").toMap, + "columnSumCheck on foo[LongType]: Expected Range: ( , 1.0) Actual: 3 Relative Error: 200.00%")) } it("upper bound inclusive success") { @@ -250,6 +270,29 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) + assert(check.getEvents contains + ColumnBasedValidatorCheckEvent(true, + ListMap("lower_bound" -> "0", "upper_bound" -> "2", "inclusive" -> "false", "actual" -> "2", + "relative_error" -> "undefined").toMap, + "columnSumCheck on foo[LongType]: Expected Range: (0 , 2) Actual: 2 Relative Error: undefined")) + } + + it("upper and lower bound inclusive failure") { + val check = ColumnSumCheck( + "foo", + minValue = Some(Json.fromInt(10)), // scalastyle:ignore magic.number + maxValue = Some(Json.fromInt(20)), // scalastyle:ignore magic.number + inclusive = Some(Json.fromBoolean(true)) + ) + val df = mkDf(spark, "foo" -> List(1, 1, 1)) + val sut = ValidatorDataFrame(df, None, None, List(check)) + assert(sut.quickChecks(spark, mkDict())(config)) + assert(sut.failed) + assert(check.getEvents contains + ColumnBasedValidatorCheckEvent(true, + ListMap("lower_bound" -> "10", "upper_bound" -> "20", "inclusive" -> "true", "actual" -> "3", + "relative_error" -> "70.00%").toMap, + "columnSumCheck on foo[LongType]: Expected Range: [10 , 20] Actual: 3 Relative Error: 70.00%")) } it("upper bound and lower inclusive success") { From 51b30f8a04a71ea323cacb3c446bba4bd4b5ccfc Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Tue, 26 May 2020 12:52:38 +0530 Subject: [PATCH 5/8] Minor change in README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index aaad15a..275c739 100644 --- a/README.md +++ b/README.md @@ -324,7 +324,7 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't | `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. | | `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. | -*Note:** If bounds are non-inclusive, and the actual sum is eequal to one of the bounds, the relative error percentage will be undefined. +**Note:** If bounds are non-inclusive, and the actual sum is eequal to one of the bounds, the relative error percentage will be undefined. ## Example Config From e8f681db474f39c50d8d8cd6010ba73be6bc66ca Mon Sep 17 00:00:00 2001 From: phpisciuneri Date: Wed, 27 May 2020 11:29:05 -0400 Subject: [PATCH 6/8] spelling in README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 275c739..082c7d0 100644 --- a/README.md +++ b/README.md @@ -324,7 +324,7 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't | `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. | | `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. | -**Note:** If bounds are non-inclusive, and the actual sum is eequal to one of the bounds, the relative error percentage will be undefined. +**Note:** If bounds are non-inclusive, and the actual sum is equal to one of the bounds, the relative error percentage will be undefined. ## Example Config From 954c18f228a184e04b6eaad2d583b1c739563c58 Mon Sep 17 00:00:00 2001 From: SamratMitra Date: Wed, 27 May 2020 21:25:38 +0530 Subject: [PATCH 7/8] Removed #45 changes and correct spelling mistake --- README.md | 2 +- .../scala/com/target/data_validator/TestHelpers.scala | 1 - .../data_validator/validator/ColumnSumCheckSpec.scala | 10 +--------- 3 files changed, 2 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 275c739..082c7d0 100644 --- a/README.md +++ b/README.md @@ -324,7 +324,7 @@ This check sums a column in all rows. If the sum applied to the `column` doesn't | `maxValue` | NumericType | The upper bound of the sum. Type depends on the type of the `column`. | | `inclusive` | Boolean | Include `minValue` and `maxValue` as part of the range. | -**Note:** If bounds are non-inclusive, and the actual sum is eequal to one of the bounds, the relative error percentage will be undefined. +**Note:** If bounds are non-inclusive, and the actual sum is equal to one of the bounds, the relative error percentage will be undefined. ## Example Config diff --git a/src/test/scala/com/target/data_validator/TestHelpers.scala b/src/test/scala/com/target/data_validator/TestHelpers.scala index d43a356..1277f8f 100644 --- a/src/test/scala/com/target/data_validator/TestHelpers.scala +++ b/src/test/scala/com/target/data_validator/TestHelpers.scala @@ -25,7 +25,6 @@ object TestHelpers { case "java.lang.Double" => DoubleType case "java.lang.Boolean" => BooleanType case "java.lang.Long" => LongType - case "java.lang.Byte" => ByteType case _ => throw new IllegalArgumentException(s"Unknown type '${v.getClass.getCanonicalName}'") } diff --git a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala index 9458fec..a7ddc79 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala @@ -207,7 +207,7 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession assert(!sut.failed) } - it("upper bound success with short") { + it("upper bound success") { val check = ColumnSumCheck("foo", maxValue = Some(Json.fromDouble(10).get)) // scalastyle:ignore magic.number val df = mkDf(spark, "foo" -> List[Short](1, 2, 1)) val sut = ValidatorDataFrame(df, None, None, List(check)) @@ -215,14 +215,6 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession assert(!sut.failed) } - it("upper bound success with byte") { - val check = ColumnSumCheck("foo", maxValue = Some(Json.fromDouble(10).get)) // scalastyle:ignore magic.number - val df = mkDf(spark, "foo" -> List[Byte](1, 2, 1)) - val sut = ValidatorDataFrame(df, None, None, List(check)) - assert(!sut.quickChecks(spark, mkDict())(config)) - assert(!sut.failed) - } - it("upper bound failure") { val check = ColumnSumCheck("foo", maxValue = Some(Json.fromFloat(1).get)) // scalastyle:ignore magic.number val df = mkDf(spark, "foo" -> List(1L, 1L, 1L)) From 6e704b52c870f0ab7b00f15dea52f70301357a7e Mon Sep 17 00:00:00 2001 From: phpisciuneri Date: Sat, 30 May 2020 07:53:05 -0400 Subject: [PATCH 8/8] remove mutable usage & formatting --- .../data_validator/ValidatorEvent.scala | 8 ++- .../validator/ColumnBased.scala | 42 ++++++------ .../validator/ColumnSumCheck.scala | 22 +++---- .../data_validator/ValidatorBaseSpec.scala | 24 ++++--- .../validator/ColumnBasedSpec.scala | 46 +++++++------ .../validator/ColumnSumCheckSpec.scala | 64 +++++++++++++------ 6 files changed, 119 insertions(+), 87 deletions(-) diff --git a/src/main/scala/com/target/data_validator/ValidatorEvent.scala b/src/main/scala/com/target/data_validator/ValidatorEvent.scala index 65ec20e..2046e66 100644 --- a/src/main/scala/com/target/data_validator/ValidatorEvent.scala +++ b/src/main/scala/com/target/data_validator/ValidatorEvent.scala @@ -36,9 +36,11 @@ case class ValidatorCheckEvent(failure: Boolean, label: String, count: Long, err } } -case class ColumnBasedValidatorCheckEvent(failure: Boolean, - data: Map[String, String], - msg: String) extends ValidatorEvent { +case class ColumnBasedValidatorCheckEvent( + failure: Boolean, + data: Map[String, String], + msg: String +) extends ValidatorEvent { override def failed: Boolean = failure override def toHTML: Text.all.Tag = { diff --git a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala index 842ee4a..b0bc4d6 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnBased.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnBased.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.aggregate.Max import org.apache.spark.sql.types._ -import scala.collection.mutable.ListMap +import scala.collection.immutable.ListMap import scala.math.abs abstract class ColumnBased(column: String, condTest: Expression) extends CheapCheck { @@ -57,9 +57,8 @@ case class MinNumRows(minNumRows: Long) extends ColumnBased("", ValidatorBase.L0 val pctError = if (failed) calculatePctError(minNumRows, count) else "0.00%" addEvent(ValidatorCounter("rowCount", count)) val msg = s"MinNumRowsCheck Expected: $minNumRows Actual: $count Relative Error: $pctError" - val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, - "relative_error" -> pctError) - addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, msg)) + val data = ListMap("expected" -> minNumRows.toString, "actual" -> count.toString, "relative_error" -> pctError) + addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg)) failed } @@ -89,18 +88,17 @@ case class ColumnMaxCheck(column: String, value: Json) val rMax = row(idx) logger.info(s"rMax: $rMax colType: $dataType value: $value valueClass: ${value.getClass.getCanonicalName}") - var errorMsg = "" - val data = ListMap.empty[String, String] - - def resultForString(): Unit = { + def resultForString: (ListMap[String, String], String) = { val (expected, actual) = (value.asString.getOrElse(""), row.getString(idx)) failed = expected != actual - data += ("expected" -> expected, "actual" -> actual) - errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected, Actual: $actual" + val data = ListMap("expected" -> expected, "actual" -> actual) + val errorMsg = s"ColumnMaxCheck $column[StringType]: Expected: $expected Actual: $actual" + + (data, errorMsg) } - def resultForNumeric(): Unit = { + def resultForNumeric: (ListMap[String, String], String) = { val num = value.asNumber.get var cmp_params = (0.0, 0.0) // (expected, actual) @@ -115,28 +113,32 @@ case class ColumnMaxCheck(column: String, value: Json) failed = cmp_params._1 != cmp_params._2 val pctError = if (failed) calculatePctError(cmp_params._1, cmp_params._2) else "0.00%" - data += ("expected" -> num.toString, "actual" -> rMax.toString, "relative_error" -> pctError) - errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num, Actual: $rMax. Relative Error: ${pctError}" + val data = ListMap("expected" -> num.toString, "actual" -> rMax.toString, "relative_error" -> pctError) + val errorMsg = s"ColumnMaxCheck $column[$dataType]: Expected: $num Actual: $rMax Relative Error: $pctError" + + (data, errorMsg) } - def resultForOther(): Unit = { + def resultForOther: (ListMap[String, String], String) = { logger.error( s"""ColumnMaxCheck for type: $dataType, Row: $row not implemented! |Please open a bug report on the data-validator issue tracker.""".stripMargin ) failed = true - errorMsg = s"ColumnMaxCheck is not supported for data type $dataType" + val errorMsg = s"ColumnMaxCheck is not supported for data type $dataType" + + (ListMap.empty[String, String], errorMsg) } - dataType match { - case StringType => resultForString() - case _: NumericType => resultForNumeric() - case _ => resultForOther() + val (data, errorMsg) = dataType match { + case StringType => resultForString + case _: NumericType => resultForNumeric + case _ => resultForOther } logger.debug(s"MaxValue compared Row: $row with value: $value failed: $failed") if (failed) { - addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, errorMsg)) + addEvent(ColumnBasedValidatorCheckEvent(failed, data, errorMsg)) } failed } diff --git a/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala b/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala index 88c303e..9020d41 100644 --- a/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala +++ b/src/main/scala/com/target/data_validator/validator/ColumnSumCheck.scala @@ -9,7 +9,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.aggregate.Sum import org.apache.spark.sql.types._ -import scala.collection.mutable.ListMap +import scala.collection.immutable.ListMap case class ColumnSumCheck( column: String, @@ -80,16 +80,14 @@ case class ColumnSumCheck( } } - def getData(actualSum: Double, pctError: String): ListMap[String, String] = { - val data = (minValue, maxValue) match { + def getData(pctError: String): ListMap[String, String] = { + ((minValue, maxValue) match { case (Some(x), Some(y)) => ListMap("lower_bound" -> x.asNumber.get.toString, "upper_bound" -> y.asNumber.get.toString) case (None, Some(y)) => ListMap("upper_bound" -> y.asNumber.get.toString) case (Some(x), None) => ListMap("lower_bound" -> x.asNumber.get.toString) case (None, None) => throw new RuntimeException("Must define at least one of minValue or maxValue.") - } - - data += ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError) + }) + ("inclusive" -> isInclusive.toString, "actual" -> r(idx).toString, "relative_error" -> pctError) } val actualSum: Double = dataType match { @@ -104,17 +102,17 @@ case class ColumnSumCheck( failed = evaluate(actualSum) val pctError = getPctError(actualSum) - val data = getData(actualSum, pctError) + val data = getData(pctError) - val bounds = minValue.getOrElse("") :: maxValue.getOrElse("") :: Nil - val prettyBounds = if (inclusiveBounds.right.get) { - bounds.mkString("[", " , ", "]") + val bounds = minValue.getOrElse(" ") :: maxValue.getOrElse("") :: Nil + val prettyBounds = if (isInclusive) { + bounds.mkString("[", ", ", "]") } else { - bounds.mkString("(", " , ", ")") + bounds.mkString("(", ", ", ")") } val msg = s"$name on $column[$dataType]: Expected Range: $prettyBounds Actual: ${r(idx)} Relative Error: $pctError" - addEvent(ColumnBasedValidatorCheckEvent(failed, data.toMap, msg)) + addEvent(ColumnBasedValidatorCheckEvent(failed, data, msg)) failed } diff --git a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala index cf92348..501f60b 100644 --- a/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala +++ b/src/test/scala/com/target/data_validator/ValidatorBaseSpec.scala @@ -10,7 +10,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types._ import org.scalatest._ -import scala.collection.mutable.ListMap +import scala.collection.immutable.ListMap import scala.util.Random class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { @@ -183,28 +183,32 @@ class ValidatorBaseSpec extends FunSpec with Matchers with TestingSparkSession { it("quickCheck() should fail when rowCount < minNumRows") { val dict = new VarSubstitution val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore - val minNumRowsCheck = MinNumRows(10) - val config = mkConfig(df, List(minNumRowsCheck)) //scalastyle:ignore + val minNumRowsCheck = MinNumRows(10) // scalastyle:ignore magic.number + val config = mkConfig(df, List(minNumRowsCheck)) assert(config.quickChecks(spark, dict)) assert(config.failed) assert(config.tables.head.failed) - assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(true, - ListMap("expected" -> "10", "actual" -> "2", "relative_error" -> "80.00%").toMap, - "MinNumRowsCheck Expected: 10 Actual: 2 Relative Error: 80.00%")) + assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap("expected" -> "10", "actual" -> "2", "relative_error" -> "80.00%"), + "MinNumRowsCheck Expected: 10 Actual: 2 Relative Error: 80.00%" + )) } it("quickCheck() should succeed when rowCount > minNumRows") { val dict = new VarSubstitution val df = spark.createDataFrame(sc.parallelize(List(Row("Doug", 50), Row("Collin", 32))), schema) //scalastyle:ignore val minNumRowsCheck = MinNumRows(1) - val config = mkConfig(df, List(minNumRowsCheck)) //scalastyle:ignore + val config = mkConfig(df, List(minNumRowsCheck)) assert(!config.configCheck(spark, dict)) assert(!config.quickChecks(spark, dict)) assert(!config.failed) assert(!config.tables.exists(_.failed)) - assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent(false, - ListMap("expected" -> "1", "actual" -> "2", "relative_error" -> "0.00%").toMap, - "MinNumRowsCheck Expected: 1 Actual: 2 Relative Error: 0.00%")) + assert(minNumRowsCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = false, + ListMap("expected" -> "1", "actual" -> "2", "relative_error" -> "0.00%"), + "MinNumRowsCheck Expected: 1 Actual: 2 Relative Error: 0.00%" + )) } } diff --git a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala index a032f87..c57a7c7 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnBasedSpec.scala @@ -8,7 +8,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.types._ import org.scalatest._ -import scala.collection.mutable.ListMap +import scala.collection.immutable.ListMap class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { @@ -71,10 +71,11 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) - assert(columnMaxCheck.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("expected" -> "2018/11/01", "actual" -> "2018/10/31").toMap, - "ColumnMaxCheck data[StringType]: Expected: 2018/11/01, Actual: 2018/10/31")) + assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap("expected" -> "2018/11/01", "actual" -> "2018/10/31"), + "ColumnMaxCheck data[StringType]: Expected: 2018/11/01 Actual: 2018/10/31" + )) } it("should not fail with numeric column matches max value") { @@ -87,28 +88,30 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { it("should fail when numeric column doesn't match max value") { val dict = new VarSubstitution - val columnMaxCheck = ColumnMaxCheck("number", Json.fromInt(100)) - val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore + val columnMaxCheck = ColumnMaxCheck("number", Json.fromInt(100)) // scalastyle:ignore magic.number + val sut = mkValidatorConfig(List(columnMaxCheck)) assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) - assert(columnMaxCheck.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("expected" -> "100", "actual" -> "3", "relative_error" -> "97.00%").toMap, - "ColumnMaxCheck number[IntegerType]: Expected: 100, Actual: 3. Relative Error: 97.00%")) + assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap("expected" -> "100", "actual" -> "3", "relative_error" -> "97.00%"), + "ColumnMaxCheck number[IntegerType]: Expected: 100 Actual: 3 Relative Error: 97.00%" + )) } it("should fail with undefined error % when numeric column doesn't match max value and expected value is 0") { val dict = new VarSubstitution val columnMaxCheck = ColumnMaxCheck("number", Json.fromInt(0)) - val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore + val sut = mkValidatorConfig(List(columnMaxCheck)) assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) - assert(columnMaxCheck.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("expected" -> "0", "actual" -> "3", "relative_error" -> "undefined").toMap, - "ColumnMaxCheck number[IntegerType]: Expected: 0, Actual: 3. Relative Error: undefined")) + assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap("expected" -> "0", "actual" -> "3", "relative_error" -> "undefined"), + "ColumnMaxCheck number[IntegerType]: Expected: 0 Actual: 3 Relative Error: undefined" + )) } it("should not fail when double column matches max value") { @@ -122,14 +125,15 @@ class ColumnBasedSpec extends FunSpec with Matchers with TestingSparkSession { it("should fail when double column doesn't match max value") { val dict = new VarSubstitution val columnMaxCheck = ColumnMaxCheck("double", Json.fromDouble(5.0).get) - val sut = mkValidatorConfig(List(columnMaxCheck)) // scalastyle:ignore + val sut = mkValidatorConfig(List(columnMaxCheck)) assert(!sut.configCheck(spark, dict)) assert(sut.quickChecks(spark, dict)) assert(sut.failed) - assert(columnMaxCheck.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("expected" -> "5.0", "actual" -> "3.5", "relative_error" -> "30.00%").toMap, - "ColumnMaxCheck double[DoubleType]: Expected: 5.0, Actual: 3.5. Relative Error: 30.00%")) + assert(columnMaxCheck.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap("expected" -> "5.0", "actual" -> "3.5", "relative_error" -> "30.00%"), + "ColumnMaxCheck double[DoubleType]: Expected: 5.0 Actual: 3.5 Relative Error: 30.00%" + )) } it("should fail when byte column and value overflows") { diff --git a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala index a7ddc79..a48de91 100644 --- a/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala +++ b/src/test/scala/com/target/data_validator/validator/ColumnSumCheckSpec.scala @@ -6,7 +6,7 @@ import com.target.data_validator.TestHelpers.{mkDf, mkDict, parseYaml} import io.circe._ import org.scalatest.{FunSpec, Matchers} -import scala.collection.mutable.ListMap +import scala.collection.immutable.ListMap class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession { @@ -187,11 +187,16 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) - assert(check.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("lower_bound" -> "6", "inclusive" -> "false", "actual" -> "6", - "relative_error" -> "undefined").toMap, - "columnSumCheck on foo[LongType]: Expected Range: (6 , ) Actual: 6 Relative Error: undefined")) + assert(check.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap( + "lower_bound" -> "6", + "inclusive" -> "false", + "actual" -> "6", + "relative_error" -> "undefined" + ), + "columnSumCheck on foo[LongType]: Expected Range: (6, ) Actual: 6 Relative Error: undefined" + )) } it("lower bound inclusive success") { @@ -221,11 +226,16 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) - assert(check.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("upper_bound" -> "1.0", "inclusive" -> "false", "actual" -> "3", - "relative_error" -> "200.00%").toMap, - "columnSumCheck on foo[LongType]: Expected Range: ( , 1.0) Actual: 3 Relative Error: 200.00%")) + assert(check.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap( + "upper_bound" -> "1.0", + "inclusive" -> "false", + "actual" -> "3", + "relative_error" -> "200.00%" + ), + "columnSumCheck on foo[LongType]: Expected Range: ( , 1.0) Actual: 3 Relative Error: 200.00%" + )) } it("upper bound inclusive success") { @@ -262,11 +272,17 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) - assert(check.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("lower_bound" -> "0", "upper_bound" -> "2", "inclusive" -> "false", "actual" -> "2", - "relative_error" -> "undefined").toMap, - "columnSumCheck on foo[LongType]: Expected Range: (0 , 2) Actual: 2 Relative Error: undefined")) + assert(check.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap( + "lower_bound" -> "0", + "upper_bound" -> "2", + "inclusive" -> "false", + "actual" -> "2", + "relative_error" -> "undefined" + ), + "columnSumCheck on foo[LongType]: Expected Range: (0, 2) Actual: 2 Relative Error: undefined" + )) } it("upper and lower bound inclusive failure") { @@ -280,11 +296,17 @@ class ColumnSumCheckSpec extends FunSpec with Matchers with TestingSparkSession val sut = ValidatorDataFrame(df, None, None, List(check)) assert(sut.quickChecks(spark, mkDict())(config)) assert(sut.failed) - assert(check.getEvents contains - ColumnBasedValidatorCheckEvent(true, - ListMap("lower_bound" -> "10", "upper_bound" -> "20", "inclusive" -> "true", "actual" -> "3", - "relative_error" -> "70.00%").toMap, - "columnSumCheck on foo[LongType]: Expected Range: [10 , 20] Actual: 3 Relative Error: 70.00%")) + assert(check.getEvents contains ColumnBasedValidatorCheckEvent( + failure = true, + ListMap( + "lower_bound" -> "10", + "upper_bound" -> "20", + "inclusive" -> "true", + "actual" -> "3", + "relative_error" -> "70.00%" + ), + "columnSumCheck on foo[LongType]: Expected Range: [10, 20] Actual: 3 Relative Error: 70.00%" + )) } it("upper bound and lower inclusive success") {