From 93ac6bb3eb63efb775b48af090a37a6cbe4f30c4 Mon Sep 17 00:00:00 2001 From: Hossein Date: Thu, 24 Mar 2016 16:31:38 -0700 Subject: [PATCH 1/4] Added support for null, NaN and Inf options for numeric types --- .../datasources/csv/CSVInferSchema.scala | 74 ++++++++++++------- .../datasources/csv/CSVOptions.scala | 35 +++++++++ .../datasources/csv/CSVRelation.scala | 2 +- sql/core/src/test/resources/numbers.csv | 9 +++ .../execution/datasources/csv/CSVSuite.scala | 31 ++++++++ .../datasources/csv/CSVTypeCastSuite.scala | 73 +++++++++++++++++- 6 files changed, 193 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/test/resources/numbers.csv diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 797f740dc5b54..63bbed476c204 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -177,35 +177,57 @@ private[csv] object CSVTypeCast { datum: String, castType: DataType, nullable: Boolean = true, - nullValue: String = ""): Any = { + params: CSVOptions = CSVOptions()): Any = { - if (datum == nullValue && nullable && (!castType.isInstanceOf[StringType])) { - null - } else { - castType match { - case _: ByteType => datum.toByte - case _: ShortType => datum.toShort - case _: IntegerType => datum.toInt - case _: LongType => datum.toLong - case _: FloatType => Try(datum.toFloat) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) - case _: DoubleType => Try(datum.toDouble) - .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) - case _: BooleanType => datum.toBoolean - case dt: DecimalType => + castType match { + case _: ByteType => if (datum == params.byteNullValue && nullable) null else datum.toByte + case _: ShortType => if (datum == params.shortNullValue && nullable) null else datum.toShort + case _: IntegerType => if (datum == params.integerNullValue && nullable) null else datum.toInt + case _: LongType => if (datum == params.longNullValue && nullable) null else datum.toLong + case _: FloatType => + if (datum == params.floatNullValue && nullable) { + null + } else if (datum == params.floatNaNValue) { + Float.NaN + } else if (datum == params.floatNegativeInf) { + Float.NegativeInfinity + } else if (datum == params.floatPositiveInf) { + Float.PositiveInfinity + } else { + Try(datum.toFloat) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) + } + case _: DoubleType => + if (datum == params.doubleNullValue && nullable) { + null + } else if (datum == params.doubleNaNValue) { + Double.NaN + } else if (datum == params.doubleNegativeInf) { + Double.NegativeInfinity + } else if (datum == params.doublePositiveInf) { + Double.PositiveInfinity + } else { + Try(datum.toDouble) + .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).doubleValue()) + } + case _: BooleanType => datum.toBoolean + case dt: DecimalType => + if (datum == params.decimalNullValue && nullable) { + null + } else { val value = new BigDecimal(datum.replaceAll(",", "")) Decimal(value, dt.precision, dt.scale) - // TODO(hossein): would be good to support other common timestamp formats - case _: TimestampType => - // This one will lose microseconds parts. - // See https://issues.apache.org/jira/browse/SPARK-10681. - DateTimeUtils.stringToTime(datum).getTime * 1000L - // TODO(hossein): would be good to support other common date formats - case _: DateType => - DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) - case _: StringType => UTF8String.fromString(datum) - case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") - } + } + // TODO(hossein): would be good to support other common timestamp formats + case _: TimestampType => + // This one will lose microseconds parts. + // See https://issues.apache.org/jira/browse/SPARK-10681. + DateTimeUtils.stringToTime(datum).getTime * 1000L + // TODO(hossein): would be good to support other common date formats + case _: DateType => + DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(datum).getTime) + case _: StringType => UTF8String.fromString(datum) + case _ => throw new RuntimeException(s"Unsupported type: ${castType.typeName}") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 95de02cf5c182..bb777ecd171c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -86,6 +86,30 @@ private[sql] class CSVOptions( val nullValue = parameters.getOrElse("nullValue", "") + val integerNullValue = parameters.getOrElse("integerNullValue", nullValue) + + val byteNullValue = parameters.getOrElse("byteNullValue", nullValue) + + val shortNullValue = parameters.getOrElse("shortNullValue", nullValue) + + val longNullValue = parameters.getOrElse("longNullValue", nullValue) + + val floatNullValue = parameters.getOrElse("floatNullValue", nullValue) + + val doubleNullValue = parameters.getOrElse("doubleNullValue", nullValue) + + val decimalNullValue = parameters.getOrElse("decimalNullValue", nullValue) + + val floatNaNValue = parameters.getOrElse("floatNaNValue", "NaN") + + val doubleNaNValue = parameters.getOrElse("doubleNaNValue", "NaN") + + val floatNegativeInf = parameters.getOrElse("floatNegativeInf", "-Inf") + val floatPositiveInf = parameters.getOrElse("floatPositiveInf", "Inf") + + val doubleNegativeInf = parameters.getOrElse("doubleNegativeInf", "-Inf") + val doublePositiveInf = parameters.getOrElse("doublePositiveInf", "Inf") + val compressionCodec: Option[String] = { val name = parameters.get("compression").orElse(parameters.get("codec")) name.map(CompressionCodecs.getCodecClassName) @@ -101,3 +125,14 @@ private[sql] class CSVOptions( val rowSeparator = "\n" } + +object CSVOptions { + + /** Used for convenient construction in unit tests */ + def apply(): CSVOptions = new CSVOptions(Map.empty) + + /** Used for conveneint construction with single option in uni tests */ + def apply(paramName: String, paramValue: String): CSVOptions = { + new CSVOptions(Map(paramName -> paramValue)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 5501015775202..387ba84fd0eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -102,7 +102,7 @@ object CSVRelation extends Logging { indexSafeTokens(index), field.dataType, field.nullable, - params.nullValue) + params) if (subIndex < requiredSize) { row(subIndex) = value } diff --git a/sql/core/src/test/resources/numbers.csv b/sql/core/src/test/resources/numbers.csv new file mode 100644 index 0000000000000..ce69513d5f4da --- /dev/null +++ b/sql/core/src/test/resources/numbers.csv @@ -0,0 +1,9 @@ +int,long,float,double +8,1000000,1.042,23848545.0374 +--,34232323,98.343,184721.23987223 +34,++,98.343,184721.23987223 +34,43323123,null,184721.23987223 +34,43323123,223823.9484,NULL +34,43323123,223823.FNAN,DNAN +34,43323123,223823.FINF,DINF +34,43323123,223823.-FINF,-DINF diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 58d9d69d9a8a5..dd7b4bebf3510 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -45,6 +45,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { private val disableCommentsFile = "disable_comments.csv" private val boolFile = "bool.csv" private val simpleSparseFile = "simple_sparse.csv" + private val numbersFile = "numbers.csv" private def testFile(fileName: String): String = { Thread.currentThread().getContextClassLoader.getResource(fileName).toString @@ -478,4 +479,34 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = false, checkTypes = false) } + + test("nulls, NaNs and Infinity values can be parsed") { + val numbers = sqlContext + .read + .format("csv") + .schema(StructType(List( + StructField("int", IntegerType, true), + StructField("long", LongType, true), + StructField("float", FloatType, true), + StructField("double", DoubleType, true) + ))) + .options(Map( + "header" -> "true", + "mode" -> "DROPMALFORMED", + "integerNullValue" -> "--", + "longNullValue" -> "++", + "floatNullValue" -> "null", + "doubleNullValue" -> "NULL", + "floatNaNValue" -> "FNAN", + "doubleNaNValue" -> "DNAN", + "floatNegativeInf" -> "-FINF", + "floatPositiveInf" -> "FINF", + "doublePositiveInf" -> "DINF", + "doubleNegativeInf" -> "-DINF")) + .load(testFile(numbersFile)) + + assert(numbers.count() == 8) + + + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 5702a1b4ea1f7..1c7784ff6b828 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -27,6 +27,8 @@ import org.apache.spark.unsafe.types.UTF8String class CSVTypeCastSuite extends SparkFunSuite { + private def isNull(v: Any) = assert(v == null) + test("Can parse decimal type values") { val stringValues = Seq("10.05", "1,000.01", "158,058,049.001") val decimalValues = Seq(10.05, 1000.01, 158058049.001) @@ -64,17 +66,21 @@ class CSVTypeCastSuite extends SparkFunSuite { } test("Nullable types are handled") { - assert(CSVTypeCast.castTo("", IntegerType, nullable = true) == null) + assert(CSVTypeCast.castTo("", IntegerType, nullable = true, CSVOptions()) == null) } test("String type should always return the same as the input") { - assert(CSVTypeCast.castTo("", StringType, nullable = true) == UTF8String.fromString("")) - assert(CSVTypeCast.castTo("", StringType, nullable = false) == UTF8String.fromString("")) + assert( + CSVTypeCast.castTo("", StringType, nullable = true, CSVOptions()) == + UTF8String.fromString("")) + assert( + CSVTypeCast.castTo("", StringType, nullable = false, CSVOptions()) == + UTF8String.fromString("")) } test("Throws exception for empty string with non null type") { val exception = intercept[NumberFormatException]{ - CSVTypeCast.castTo("", IntegerType, nullable = false) + CSVTypeCast.castTo("", IntegerType, nullable = false, CSVOptions()) } assert(exception.getMessage.contains("For input string: \"\"")) } @@ -105,4 +111,63 @@ class CSVTypeCastSuite extends SparkFunSuite { Locale.setDefault(originalLocale) } } + + test("Float NaN values are parsed correctly") { + val floatVal: Float = CSVTypeCast.castTo( + "nn", FloatType, nullable = true, CSVOptions("floatNaNValue", "nn")).asInstanceOf[Float] + + // Java implements the IEEE-754 floating point standard which guarantees that any comparison + // against NaN will return false (except != which returns true) + assert(floatVal != floatVal) + } + + test("Double NaN values are parsed correctly") { + val doubleVal: Double = CSVTypeCast.castTo( + "-", DoubleType, nullable = true, CSVOptions("doubleNaNValue", "-")).asInstanceOf[Double] + + assert(doubleVal.isNaN) + } + + test("Float infinite values can be parsed") { + val floatVal1 = CSVTypeCast.castTo( + "max", FloatType, nullable = true, CSVOptions("floatNegativeInf", "max")).asInstanceOf[Float] + + assert(floatVal1 == Float.NegativeInfinity) + + val floatVal2 = CSVTypeCast.castTo( + "max", FloatType, nullable = true, CSVOptions("floatPositiveInf", "max")).asInstanceOf[Float] + + assert(floatVal2 == Float.PositiveInfinity) + } + + test("Double infinite values can be parsed") { + val doubleVal1 = CSVTypeCast.castTo( + "max", DoubleType, nullable = true, CSVOptions("doubleNegativeInf", "max") + ).asInstanceOf[Double] + + assert(doubleVal1 == Double.NegativeInfinity) + + val doubleVal2 = CSVTypeCast.castTo( + "max", DoubleType, nullable = true, CSVOptions("doublePositiveInf", "max") + ).asInstanceOf[Double] + + assert(doubleVal2 == Double.PositiveInfinity) + } + + test("Type-specific null values are used for casting") { + isNull( + CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("byteNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("shortNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("integerNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("longNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("floatNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("doubleNullValue", "-"))) + isNull( + CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("decimalNullValue", "-"))) + } } From 180a9000af49f46ad4d6e0e4b424309c46f3bfa6 Mon Sep 17 00:00:00 2001 From: Hossein Date: Tue, 5 Apr 2016 12:00:28 -0700 Subject: [PATCH 2/4] Addressed comments --- .../apache/spark/sql/execution/datasources/csv/CSVOptions.scala | 2 -- .../apache/spark/sql/execution/datasources/csv/CSVSuite.scala | 2 -- .../spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala | 2 +- 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index bb777ecd171c6..61ca52f229ff4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -128,10 +128,8 @@ private[sql] class CSVOptions( object CSVOptions { - /** Used for convenient construction in unit tests */ def apply(): CSVOptions = new CSVOptions(Map.empty) - /** Used for conveneint construction with single option in uni tests */ def apply(paramName: String, paramValue: String): CSVOptions = { new CSVOptions(Map(paramName -> paramValue)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index dd7b4bebf3510..80a5e5379631a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -506,7 +506,5 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .load(testFile(numbersFile)) assert(numbers.count() == 8) - - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 1c7784ff6b828..35e5e765d3d7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.unsafe.types.UTF8String class CSVTypeCastSuite extends SparkFunSuite { - private def isNull(v: Any) = assert(v == null) + private def assertNull(v: Any) = assert(v == null) test("Can parse decimal type values") { val stringValues = Seq("10.05", "1,000.01", "158,058,049.001") From 124873bd469b827ef8de11931001ba1186157dbb Mon Sep 17 00:00:00 2001 From: Hossein Date: Tue, 5 Apr 2016 15:40:43 -0700 Subject: [PATCH 3/4] Using assertNull instead of isNull --- .../datasources/csv/CSVTypeCastSuite.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index 35e5e765d3d7b..a37a462b7b646 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -155,19 +155,19 @@ class CSVTypeCastSuite extends SparkFunSuite { } test("Type-specific null values are used for casting") { - isNull( + assertNull( CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("byteNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("shortNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("integerNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("longNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("floatNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("doubleNullValue", "-"))) - isNull( + assertNull( CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("decimalNullValue", "-"))) } } From 698b4b41baa1ebd5d66ea6242bcb39bcd0887f8b Mon Sep 17 00:00:00 2001 From: Hossein Date: Fri, 29 Apr 2016 19:17:58 -0700 Subject: [PATCH 4/4] Updated to reduce number of options --- .../datasources/csv/CSVInferSchema.scala | 26 +++++++++---------- .../datasources/csv/CSVOptions.scala | 24 +++-------------- sql/core/src/test/resources/numbers.csv | 12 ++++----- .../execution/datasources/csv/CSVSuite.scala | 14 +++------- .../datasources/csv/CSVTypeCastSuite.scala | 26 +++++++++---------- 5 files changed, 39 insertions(+), 63 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index af9db005c71af..f42f832d66436 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -180,31 +180,31 @@ private[csv] object CSVTypeCast { params: CSVOptions = CSVOptions()): Any = { castType match { - case _: ByteType => if (datum == params.byteNullValue && nullable) null else datum.toByte - case _: ShortType => if (datum == params.shortNullValue && nullable) null else datum.toShort - case _: IntegerType => if (datum == params.integerNullValue && nullable) null else datum.toInt - case _: LongType => if (datum == params.longNullValue && nullable) null else datum.toLong + case _: ByteType => if (datum == params.nullValue && nullable) null else datum.toByte + case _: ShortType => if (datum == params.nullValue && nullable) null else datum.toShort + case _: IntegerType => if (datum == params.nullValue && nullable) null else datum.toInt + case _: LongType => if (datum == params.nullValue && nullable) null else datum.toLong case _: FloatType => - if (datum == params.floatNullValue && nullable) { + if (datum == params.nullValue && nullable) { null - } else if (datum == params.floatNaNValue) { + } else if (datum == params.nanValue) { Float.NaN - } else if (datum == params.floatNegativeInf) { + } else if (datum == params.negativeInf) { Float.NegativeInfinity - } else if (datum == params.floatPositiveInf) { + } else if (datum == params.positiveInf) { Float.PositiveInfinity } else { Try(datum.toFloat) .getOrElse(NumberFormat.getInstance(Locale.getDefault).parse(datum).floatValue()) } case _: DoubleType => - if (datum == params.doubleNullValue && nullable) { + if (datum == params.nullValue && nullable) { null - } else if (datum == params.doubleNaNValue) { + } else if (datum == params.nanValue) { Double.NaN - } else if (datum == params.doubleNegativeInf) { + } else if (datum == params.negativeInf) { Double.NegativeInfinity - } else if (datum == params.doublePositiveInf) { + } else if (datum == params.positiveInf) { Double.PositiveInfinity } else { Try(datum.toDouble) @@ -212,7 +212,7 @@ private[csv] object CSVTypeCast { } case _: BooleanType => datum.toBoolean case dt: DecimalType => - if (datum == params.decimalNullValue && nullable) { + if (datum == params.nullValue && nullable) { null } else { val value = new BigDecimal(datum.replaceAll(",", "")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index b837342456b34..f728fec89581e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -89,29 +89,11 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str val nullValue = parameters.getOrElse("nullValue", "") - val integerNullValue = parameters.getOrElse("integerNullValue", nullValue) + val nanValue = parameters.getOrElse("nanValue", "NaN") - val byteNullValue = parameters.getOrElse("byteNullValue", nullValue) + val positiveInf = parameters.getOrElse("positiveInf", "Inf") + val negativeInf = parameters.getOrElse("negativeInf", "-Inf") - val shortNullValue = parameters.getOrElse("shortNullValue", nullValue) - - val longNullValue = parameters.getOrElse("longNullValue", nullValue) - - val floatNullValue = parameters.getOrElse("floatNullValue", nullValue) - - val doubleNullValue = parameters.getOrElse("doubleNullValue", nullValue) - - val decimalNullValue = parameters.getOrElse("decimalNullValue", nullValue) - - val floatNaNValue = parameters.getOrElse("floatNaNValue", "NaN") - - val doubleNaNValue = parameters.getOrElse("doubleNaNValue", "NaN") - - val floatNegativeInf = parameters.getOrElse("floatNegativeInf", "-Inf") - val floatPositiveInf = parameters.getOrElse("floatPositiveInf", "Inf") - - val doubleNegativeInf = parameters.getOrElse("doubleNegativeInf", "-Inf") - val doublePositiveInf = parameters.getOrElse("doublePositiveInf", "Inf") val compressionCodec: Option[String] = { val name = parameters.get("compression").orElse(parameters.get("codec")) diff --git a/sql/core/src/test/resources/numbers.csv b/sql/core/src/test/resources/numbers.csv index ce69513d5f4da..af8feac784d80 100644 --- a/sql/core/src/test/resources/numbers.csv +++ b/sql/core/src/test/resources/numbers.csv @@ -1,9 +1,9 @@ int,long,float,double 8,1000000,1.042,23848545.0374 --,34232323,98.343,184721.23987223 -34,++,98.343,184721.23987223 -34,43323123,null,184721.23987223 -34,43323123,223823.9484,NULL -34,43323123,223823.FNAN,DNAN -34,43323123,223823.FINF,DINF -34,43323123,223823.-FINF,-DINF +34,--,98.343,184721.23987223 +34,43323123,--,184721.23987223 +34,43323123,223823.9484,-- +34,43323123,223823.NAN,NAN +34,43323123,223823.INF,INF +34,43323123,223823.-INF,-INF diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f6dbd6f3523f4..6b3c44e711ccb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -500,16 +500,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { .options(Map( "header" -> "true", "mode" -> "DROPMALFORMED", - "integerNullValue" -> "--", - "longNullValue" -> "++", - "floatNullValue" -> "null", - "doubleNullValue" -> "NULL", - "floatNaNValue" -> "FNAN", - "doubleNaNValue" -> "DNAN", - "floatNegativeInf" -> "-FINF", - "floatPositiveInf" -> "FINF", - "doublePositiveInf" -> "DINF", - "doubleNegativeInf" -> "-DINF")) + "nullValue" -> "--", + "nanValue" -> "NAN", + "negativeInf" -> "-INF", + "positiveInf" -> "INF")) .load(testFile(numbersFile)) assert(numbers.count() == 8) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala index a37a462b7b646..6de3fd0810e77 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVTypeCastSuite.scala @@ -114,7 +114,7 @@ class CSVTypeCastSuite extends SparkFunSuite { test("Float NaN values are parsed correctly") { val floatVal: Float = CSVTypeCast.castTo( - "nn", FloatType, nullable = true, CSVOptions("floatNaNValue", "nn")).asInstanceOf[Float] + "nn", FloatType, nullable = true, CSVOptions("nanValue", "nn")).asInstanceOf[Float] // Java implements the IEEE-754 floating point standard which guarantees that any comparison // against NaN will return false (except != which returns true) @@ -123,32 +123,32 @@ class CSVTypeCastSuite extends SparkFunSuite { test("Double NaN values are parsed correctly") { val doubleVal: Double = CSVTypeCast.castTo( - "-", DoubleType, nullable = true, CSVOptions("doubleNaNValue", "-")).asInstanceOf[Double] + "-", DoubleType, nullable = true, CSVOptions("nanValue", "-")).asInstanceOf[Double] assert(doubleVal.isNaN) } test("Float infinite values can be parsed") { val floatVal1 = CSVTypeCast.castTo( - "max", FloatType, nullable = true, CSVOptions("floatNegativeInf", "max")).asInstanceOf[Float] + "max", FloatType, nullable = true, CSVOptions("negativeInf", "max")).asInstanceOf[Float] assert(floatVal1 == Float.NegativeInfinity) val floatVal2 = CSVTypeCast.castTo( - "max", FloatType, nullable = true, CSVOptions("floatPositiveInf", "max")).asInstanceOf[Float] + "max", FloatType, nullable = true, CSVOptions("positiveInf", "max")).asInstanceOf[Float] assert(floatVal2 == Float.PositiveInfinity) } test("Double infinite values can be parsed") { val doubleVal1 = CSVTypeCast.castTo( - "max", DoubleType, nullable = true, CSVOptions("doubleNegativeInf", "max") + "max", DoubleType, nullable = true, CSVOptions("negativeInf", "max") ).asInstanceOf[Double] assert(doubleVal1 == Double.NegativeInfinity) val doubleVal2 = CSVTypeCast.castTo( - "max", DoubleType, nullable = true, CSVOptions("doublePositiveInf", "max") + "max", DoubleType, nullable = true, CSVOptions("positiveInf", "max") ).asInstanceOf[Double] assert(doubleVal2 == Double.PositiveInfinity) @@ -156,18 +156,18 @@ class CSVTypeCastSuite extends SparkFunSuite { test("Type-specific null values are used for casting") { assertNull( - CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("byteNullValue", "-"))) + CSVTypeCast.castTo("-", ByteType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("shortNullValue", "-"))) + CSVTypeCast.castTo("-", ShortType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("integerNullValue", "-"))) + CSVTypeCast.castTo("-", IntegerType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("longNullValue", "-"))) + CSVTypeCast.castTo("-", LongType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("floatNullValue", "-"))) + CSVTypeCast.castTo("-", FloatType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("doubleNullValue", "-"))) + CSVTypeCast.castTo("-", DoubleType, nullable = true, CSVOptions("nullValue", "-"))) assertNull( - CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("decimalNullValue", "-"))) + CSVTypeCast.castTo("-", DecimalType.DoubleDecimal, true, CSVOptions("nullValue", "-"))) } }