From d152cb5ac855eeeac962a4b547f6f96522fd1223 Mon Sep 17 00:00:00 2001 From: Rishabh Bhardwaj Date: Mon, 19 Oct 2015 12:12:56 +0530 Subject: [PATCH 1/2] [ SPARK-11180 ] [ SQL ] DataFrame.na.fill does not support Boolean Type --- .../spark/sql/DataFrameNaFunctions.scala | 29 ++++++++++++------- .../spark/sql/DataFrameNaFunctionsSuite.scala | 14 +++++---- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index 77a42c0873a6b..f7be5f6b370ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -198,7 +198,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * Returns a new [[DataFrame]] that replaces null values. * * The key of the map is the column name, and the value of the map is the replacement value. - * The value must be of the following type: `Integer`, `Long`, `Float`, `Double`, `String`. + * The value must be of the following type: + * `Integer`, `Long`, `Float`, `Double`, `String`, `Boolean`. * * For example, the following replaces null values in column "A" with string "unknown", and * null values in column "B" with numeric value 1.0. @@ -215,7 +216,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * (Scala-specific) Returns a new [[DataFrame]] that replaces null values. * * The key of the map is the column name, and the value of the map is the replacement value. - * The value must be of the following type: `Int`, `Long`, `Float`, `Double`, `String`. + * The value must be of the following type: `Int`, `Long`, `Float`, `Double`, `String`, `Boolean`. * * For example, the following replaces null values in column "A" with string "unknown", and * null values in column "B" with numeric value 1.0. @@ -232,7 +233,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { /** * Replaces values matching keys in `replacement` map with the corresponding values. - * Key and value of `replacement` map must have the same type, and can only be doubles or strings. + * Key and value of `replacement` map must have the same type, and + * can only be doubles, strings or booleans. * If `col` is "*", then the replacement is applied on all string columns or numeric columns. * * {{{ @@ -259,7 +261,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { /** * Replaces values matching keys in `replacement` map with the corresponding values. - * Key and value of `replacement` map must have the same type, and can only be doubles or strings. + * Key and value of `replacement` map must have the same type, and + * can only be doubles, strings or booleans. * * {{{ * import com.google.common.collect.ImmutableMap; @@ -282,8 +285,10 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { /** * (Scala-specific) Replaces values matching keys in `replacement` map. - * Key and value of `replacement` map must have the same type, and can only be doubles or strings. - * If `col` is "*", then the replacement is applied on all string columns or numeric columns. + * Key and value of `replacement` map must have the same type, and + * can only be doubles, strings or booleans. + * If `col` is "*", + * then the replacement is applied on all string columns , numeric columns or boolean columns. * * {{{ * // Replaces all occurrences of 1.0 with 2.0 in column "height". @@ -311,7 +316,8 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { /** * (Scala-specific) Replaces values matching keys in `replacement` map. - * Key and value of `replacement` map must have the same type, and can only be doubles or strings. + * Key and value of `replacement` map must have the same type, and + * can only be doubles , strings or booleans. * * {{{ * // Replaces all occurrences of 1.0 with 2.0 in column "height" and "weight". @@ -333,15 +339,17 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { return df } - // replacementMap is either Map[String, String] or Map[Double, Double] + // replacementMap is either Map[String, String] or Map[Double, Double] or Map[Boolean,Boolean] val replacementMap: Map[_, _] = replacement.head._2 match { case v: String => replacement + case v: Boolean => replacement case _ => replacement.map { case (k, v) => (convertToDouble(k), convertToDouble(v)) } } - // targetColumnType is either DoubleType or StringType + // targetColumnType is either DoubleType or StringType or BooleanType val targetColumnType = replacement.head._1 match { case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long => DoubleType + case _: jl.Boolean => BooleanType case _: String => StringType } @@ -367,7 +375,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { // Check data type replaceValue match { - case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: String => + case _: jl.Double | _: jl.Float | _: jl.Integer | _: jl.Long | _: jl.Boolean | _: String => // This is good case _ => throw new IllegalArgumentException( s"Unsupported value type ${replaceValue.getClass.getName} ($replaceValue).") @@ -382,6 +390,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { case v: jl.Double => fillCol[Double](f, v) case v: jl.Long => fillCol[Double](f, v.toDouble) case v: jl.Integer => fillCol[Double](f, v.toDouble) + case v: jl.Boolean => fillCol[Boolean](f, v.booleanValue()) case v: String => fillCol[String](f, v) } }.getOrElse(df.col(f.name)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index 329ffb66083b1..e34875471f093 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -141,24 +141,26 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { } test("fill with map") { - val df = Seq[(String, String, java.lang.Long, java.lang.Double)]( - (null, null, null, null)).toDF("a", "b", "c", "d") + val df = Seq[(String, String, java.lang.Long, java.lang.Double, java.lang.Boolean)]( + (null, null, null, null, null)).toDF("a", "b", "c", "d", "e") checkAnswer( df.na.fill(Map( "a" -> "test", "c" -> 1, - "d" -> 2.2 + "d" -> 2.2, + "e" -> false )), - Row("test", null, 1, 2.2)) + Row("test", null, 1, 2.2, false)) // Test Java version checkAnswer( df.na.fill(Map( "a" -> "test", "c" -> 1, - "d" -> 2.2 + "d" -> 2.2, + "e" -> false ).asJava), - Row("test", null, 1, 2.2)) + Row("test", null, 1, 2.2, false)) } test("replace") { From 61bb509e33614e63cbb8908593c88d424b635430 Mon Sep 17 00:00:00 2001 From: Rishabh Bhardwaj Date: Sun, 11 Jun 2017 00:12:08 +0530 Subject: [PATCH 2/2] [SPARK-21039] Using treeAggregate instead of aggregate in bloomFilter --- .../scala/org/apache/spark/sql/DataFrameStatFunctions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala index c856d3099f6ee..531c613afb0dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala @@ -551,7 +551,7 @@ final class DataFrameStatFunctions private[sql](df: DataFrame) { ) } - singleCol.queryExecution.toRdd.aggregate(zero)( + singleCol.queryExecution.toRdd.treeAggregate(zero)( (filter: BloomFilter, row: InternalRow) => { updater(filter, row) filter