From 0669ac4bf7807b0e3b6242f5c691a005ffe27548 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 26 Nov 2017 09:44:41 +0000 Subject: [PATCH 1/3] Initial commit --- .../spark/sql/catalyst/expressions/Cast.scala | 2 +- .../expressions/codegen/CodeGenerator.scala | 23 +++++++++++++++++++ .../sql/catalyst/expressions/predicates.scala | 10 ++++---- .../expressions/stringExpressions.scala | 20 +++++----------- 4 files changed, 34 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 12baddf1bf7ac..8cafaef61c7d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -1040,7 +1040,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String } """ } - val fieldsEvalCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { + val fieldsEvalCodes = if (ctx.currentVars == null) { ctx.splitExpressions( expressions = fieldsEvalCode, funcName = "castStruct", diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 668c816b3fd8d..71df2d534d55a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -795,6 +795,29 @@ class CodegenContext { splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", INPUT_ROW) :: Nil) } + /** + * Splits the generated code of expressions into multiple functions, because function has + * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * + * @param expressions the codes to evaluate expressions. + * @param funcName the split function name base. + * @param argumentsExceptRow the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW + */ + def splitExpressions( + expressions: Seq[String], + funcName: String, + argumentsExceptRow: Seq[(String, String)]): String = { + if (INPUT_ROW == null || currentVars != null) { + // Cannot split these expressions because they are not created from a row object. + return expressions.mkString("\n") + } + splitExpressions( + expressions, + funcName, + arguments = ("InternalRow", INPUT_ROW) +: argumentsExceptRow) + } + /** * Splits the generated code of expressions into multiple functions, because function has * 64kb code size limit in JVM diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index eb7475354b104..1fee90e3b2147 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -251,12 +251,10 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { } } """) - val listCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - val args = ("InternalRow", ctx.INPUT_ROW) :: (ctx.javaType(value.dataType), valueArg) :: Nil - ctx.splitExpressions(expressions = listCode, funcName = "valueIn", arguments = args) - } else { - listCode.mkString("\n") - } + val listCodes = ctx.splitExpressions( + expressions = listCode, + funcName = "valueIn", + argumentsExceptRow = (ctx.javaType(value.dataType), valueArg) :: Nil) ev.copy(code = s""" ${valueGen.code} ${ev.value} = false; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index ee5cf925d3cef..16b8b8b8e5e38 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -73,14 +73,10 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas } """ } - val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - ctx.splitExpressions( - expressions = inputs, - funcName = "valueConcat", - arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) - } else { - inputs.mkString("\n") - } + val codes = ctx.splitExpressions( + expressions = inputs, + funcName = "valueConcat", + argumentsExceptRow = ("UTF8String[]", args) :: Nil) ev.copy(s""" UTF8String[] $args = new UTF8String[${evals.length}]; $codes @@ -156,14 +152,10 @@ case class ConcatWs(children: Seq[Expression]) "" } } - val codes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - ctx.splitExpressions( + val codes = ctx.splitExpressions( expressions = inputs, funcName = "valueConcatWs", - arguments = ("InternalRow", ctx.INPUT_ROW) :: ("UTF8String[]", args) :: Nil) - } else { - inputs.mkString("\n") - } + argumentsExceptRow = ("UTF8String[]", args) :: Nil) ev.copy(s""" UTF8String[] $args = new UTF8String[$numArgs]; ${separator.code} From 5332f1280b53aa760f193383104574256a1caa9e Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 29 Nov 2017 10:16:16 +0000 Subject: [PATCH 2/3] address review comments --- .../expressions/codegen/CodeGenerator.scala | 25 ++++++++----------- .../sql/catalyst/expressions/predicates.scala | 2 +- .../expressions/stringExpressions.scala | 16 +++++------- 3 files changed, 18 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 71df2d534d55a..c083866cba02e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -788,11 +788,7 @@ class CodegenContext { * @param expressions the codes to evaluate expressions. */ def splitExpressions(expressions: Seq[String]): String = { - // TODO: support whole stage codegen - if (INPUT_ROW == null || currentVars != null) { - return expressions.mkString("\n") - } - splitExpressions(expressions, funcName = "apply", arguments = ("InternalRow", INPUT_ROW) :: Nil) + splitExpressions(expressions, funcName = "apply", extraArguments = Nil) } /** @@ -801,21 +797,22 @@ class CodegenContext { * * @param expressions the codes to evaluate expressions. * @param funcName the split function name base. - * @param argumentsExceptRow the list of (type, name) of the arguments of the split function - * except for ctx.INPUT_ROW + * @param extraArguments the list of (type, name) of the arguments of the split function + * except for ctx.INPUT_ROW */ def splitExpressions( expressions: Seq[String], funcName: String, - argumentsExceptRow: Seq[(String, String)]): String = { + extraArguments: Seq[(String, String)]): String = { + // TODO: support whole stage codegen if (INPUT_ROW == null || currentVars != null) { - // Cannot split these expressions because they are not created from a row object. - return expressions.mkString("\n") + expressions.mkString("\n") + } else { + splitExpressions( + expressions, + funcName, + arguments = ("InternalRow", INPUT_ROW) +: extraArguments) } - splitExpressions( - expressions, - funcName, - arguments = ("InternalRow", INPUT_ROW) +: argumentsExceptRow) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 1fee90e3b2147..1aaaaf1db48d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -254,7 +254,7 @@ case class In(value: Expression, list: Seq[Expression]) extends Predicate { val listCodes = ctx.splitExpressions( expressions = listCode, funcName = "valueIn", - argumentsExceptRow = (ctx.javaType(value.dataType), valueArg) :: Nil) + extraArguments = (ctx.javaType(value.dataType), valueArg) :: Nil) ev.copy(code = s""" ${valueGen.code} ${ev.value} = false; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 16b8b8b8e5e38..34917ace001fa 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -76,7 +76,7 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas val codes = ctx.splitExpressions( expressions = inputs, funcName = "valueConcat", - argumentsExceptRow = ("UTF8String[]", args) :: Nil) + extraArguments = ("UTF8String[]", args) :: Nil) ev.copy(s""" UTF8String[] $args = new UTF8String[${evals.length}]; $codes @@ -155,7 +155,7 @@ case class ConcatWs(children: Seq[Expression]) val codes = ctx.splitExpressions( expressions = inputs, funcName = "valueConcatWs", - argumentsExceptRow = ("UTF8String[]", args) :: Nil) + extraArguments = ("UTF8String[]", args) :: Nil) ev.copy(s""" UTF8String[] $args = new UTF8String[$numArgs]; ${separator.code} @@ -1380,14 +1380,10 @@ case class FormatString(children: Expression*) extends Expression with ImplicitC $argList[$index] = $value; """ } - val argListCodes = if (ctx.INPUT_ROW != null && ctx.currentVars == null) { - ctx.splitExpressions( - expressions = argListCode, - funcName = "valueFormatString", - arguments = ("InternalRow", ctx.INPUT_ROW) :: ("Object[]", argList) :: Nil) - } else { - argListCode.mkString("\n") - } + val argListCodes = ctx.splitExpressions( + expressions = argListCode, + funcName = "valueFormatString", + extraArguments = ("Object[]", argList) :: Nil) val form = ctx.freshName("formatter") val formatter = classOf[java.util.Formatter].getName From 0a218fc74e129b81a1f68645d81350b7793feada Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 29 Nov 2017 14:18:18 +0000 Subject: [PATCH 3/3] address review comment --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index c083866cba02e..1645db12c53f0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -792,8 +792,8 @@ class CodegenContext { } /** - * Splits the generated code of expressions into multiple functions, because function has - * 64kb code size limit in JVM. This version takes care of INPUT_ROW and currentVars + * Similar to [[splitExpressions(expressions: Seq[String])]], but has customized function name + * and extra arguments. * * @param expressions the codes to evaluate expressions. * @param funcName the split function name base.