From d87c5db66b2854b820bebefaa04efd6f6c11e212 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Mon, 13 Mar 2023 19:57:59 +0800 Subject: [PATCH 1/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 128 +++++++++++++----- .../sql/hive/execution/HiveUDFSuite.scala | 42 ++++++ 2 files changed, 139 insertions(+), 31 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 67229d494a2dc..873855532a575 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -48,69 +48,135 @@ import org.apache.spark.sql.types._ private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression - with HiveInspectors - with CodegenFallback - with Logging with UserDefinedExpression { - override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + private[hive] lazy val helper = new HiveSimpleUDFHelper(funcWrapper, children) + + override lazy val deterministic: Boolean = helper.deterministic override def nullable: Boolean = true - @transient - lazy val function = funcWrapper.createFunction[UDF]() + override def foldable: Boolean = helper.foldable + + override lazy val dataType = helper.dataType + + // TODO: Finish input output types. + override def eval(input: InternalRow): Any = { + children.zipWithIndex.map { + case (child, idx) => + helper.setArg(idx, child.eval(input)) + } + helper.evaluate() + } + + override def toString: String = { + s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" + } + + override def prettyName: String = name + + override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})" + + override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = + copy(children = newChildren) + + protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val refTerm = ctx.addReferenceObj("this", this) + val evals = children.map(_.genCode(ctx)) + + val setValues = evals.zipWithIndex.map { + case (eval, i) => + s""" + |if (${eval.isNull}) { + | $refTerm.helper().setArg($i, null); + |} else { + | $refTerm.helper().setArg($i, ${eval.value}); + |} + |""".stripMargin + } + + val resultType = CodeGenerator.boxedType(dataType) + val resultTerm = ctx.freshName("result") + ev.copy(code = + code""" + |${evals.map(_.code).mkString("\n")} + |${setValues.mkString("\n")} + |$resultType $resultTerm = null; + |boolean ${ev.isNull} = false; + |try { + | $resultTerm = ($resultType) $refTerm.helper().evaluate(); + | ${ev.isNull} = $resultTerm == null; + |} catch (Throwable e) { + | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( + | "${funcWrapper.functionClassName}", + | "${children.map(_.dataType.catalogString).mkString(", ")}", + | "${dataType.catalogString}", + | e); + |} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |if (!${ev.isNull}) { + | ${ev.value} = $resultTerm; + |} + |""".stripMargin + ) + } +} + +class HiveSimpleUDFHelper( + funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveInspectors + with Serializable { @transient - private lazy val method = - function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) + private[hive] val deterministic = isUDFDeterministic && children.forall(_.deterministic) @transient - private lazy val arguments = children.map(toInspector).toArray + private[hive] def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) @transient - private lazy val isUDFDeterministic = { + private[hive] lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) udfType != null && udfType.deterministic() && !udfType.stateful() } - override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) + @transient + private[hive] val dataType = javaTypeToDataType(method.getGenericReturnType) - // Create parameter converters @transient - private lazy val conversionHelper = new ConversionHelper(method, arguments) + private lazy val function = funcWrapper.createFunction[UDF]() - override lazy val dataType = javaTypeToDataType(method.getGenericReturnType) + @transient + private lazy val method = + function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) @transient private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray @transient - lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( + private lazy val arguments = children.map(toInspector).toArray + + // Create parameter converters + @transient + private lazy val conversionHelper = new ConversionHelper(method, arguments) + + @transient + private lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( method.getGenericReturnType, ObjectInspectorOptions.JAVA)) @transient - private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) + private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length) - // TODO: Finish input output types. - override def eval(input: InternalRow): Any = { - val inputs = wrap(children.map(_.eval(input)), wrappers, cached) + private[hive] def setArg(index: Int, arg: Any): Unit = { + inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] + } + + private[hive] def evaluate(): Any = { val ret = FunctionRegistry.invoke( method, function, - conversionHelper.convertIfNecessary(inputs : _*): _*) + conversionHelper.convertIfNecessary(inputs: _*): _*) unwrapper(ret) } - - override def toString: String = { - s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" - } - - override def prettyName: String = name - - override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})" - - override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = - copy(children = newChildren) } // Adapter from Catalyst ExpressionResult to Hive DeferredObject diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index baa25843d48b6..8fb9209f9cb41 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.exec.UDF +import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.hadoop.hive.ql.udf.{UDAFPercentile, UDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject @@ -743,6 +744,38 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } } } + + test("SPARK-42052: HiveSimpleUDF Codegen Support") { + withUserDefinedFunction("CodeGenHiveSimpleUDF" -> false) { + sql(s"CREATE FUNCTION CodeGenHiveSimpleUDF AS '${classOf[UDFStringString].getName}'") + withTable("HiveSimpleUDFTable") { + sql(s"create table HiveSimpleUDFTable as select 'Spark SQL' as v") + val df = sql("SELECT CodeGenHiveSimpleUDF('Hello', v) from HiveSimpleUDFTable") + val plan = df.queryExecution.executedPlan + assert(plan.isInstanceOf[WholeStageCodegenExec]) + checkAnswer(df, Seq(Row("Hello Spark SQL"))) + } + } + } + + test("SPARK-42052: HiveSimpleUDF Codegen Support w/ execution failure") { + withUserDefinedFunction("CodeGenHiveSimpleUDF" -> false) { + sql(s"CREATE FUNCTION CodeGenHiveSimpleUDF AS '${classOf[SimpleUDFAssertTrue].getName}'") + withTable("HiveSimpleUDFTable") { + sql(s"create table HiveSimpleUDFTable as select false as v") + val df = sql("SELECT CodeGenHiveSimpleUDF(v) from HiveSimpleUDFTable") + checkError( + exception = intercept[SparkException](df.collect()).getCause.asInstanceOf[SparkException], + errorClass = "FAILED_EXECUTE_UDF", + parameters = Map( + "functionName" -> s"${classOf[SimpleUDFAssertTrue].getName}", + "signature" -> "boolean", + "result" -> "boolean" + ) + ) + } + } + } } class TestPair(x: Int, y: Int) extends Writable with Serializable { @@ -844,3 +877,12 @@ class ListFiles extends UDF { if (fileArray != null) Arrays.asList(fileArray: _*) else new ArrayList[String]() } } + +class SimpleUDFAssertTrue extends UDF { + def evaluate(condition: Boolean): Boolean = { + if (!condition) { + throw new HiveException("ASSERT_TRUE(): assertion failed."); + } + condition + } +} From bfbf0752830e66891423f1a682573fea8ee71ce5 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Tue, 14 Mar 2023 19:20:44 +0800 Subject: [PATCH 2/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 93 ++++++++++++------- 1 file changed, 59 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 873855532a575..3a3b77521c382 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -50,23 +50,24 @@ private[hive] case class HiveSimpleUDF( extends Expression with UserDefinedExpression { - private[hive] lazy val helper = new HiveSimpleUDFHelper(funcWrapper, children) + @transient + private lazy val evaluator = new HiveSimpleUDFEvaluator(funcWrapper, children) - override lazy val deterministic: Boolean = helper.deterministic + override lazy val deterministic: Boolean = evaluator.deterministic override def nullable: Boolean = true - override def foldable: Boolean = helper.foldable + override def foldable: Boolean = evaluator.foldable - override lazy val dataType = helper.dataType + override lazy val dataType = evaluator.dataType // TODO: Finish input output types. override def eval(input: InternalRow): Any = { children.zipWithIndex.map { case (child, idx) => - helper.setArg(idx, child.eval(input)) + evaluator.setArg(idx, child.eval(input)) } - helper.evaluate() + evaluator.evaluate() } override def toString: String = { @@ -81,16 +82,51 @@ private[hive] case class HiveSimpleUDF( copy(children = newChildren) protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val refTerm = ctx.addReferenceObj("this", this) + evaluator.doGenCode(ctx, ev) + } +} + +abstract class HiveUDFEvaluatorBase( + funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveInspectors with Serializable { + + def foldable: Boolean + + def dataType: DataType + + def functionClass: Class[_ <: AnyRef] + + @transient + val isUDFDeterministic = { + val udfType = functionClass.getAnnotation(classOf[HiveUDFType]) + udfType != null && udfType.deterministic() && !udfType.stateful() + } + + @transient + val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + + private[hive] def setArg(index: Int, arg: Any): Unit + + def unwrapper: Any => Any + + def result: Any + + final private[hive] def evaluate(): Any = { + val ret = result + unwrapper(ret) + } + + final def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val refEvaluator = ctx.addReferenceObj("evaluator", this) val evals = children.map(_.genCode(ctx)) val setValues = evals.zipWithIndex.map { case (eval, i) => s""" |if (${eval.isNull}) { - | $refTerm.helper().setArg($i, null); + | $refEvaluator.setArg($i, null); |} else { - | $refTerm.helper().setArg($i, ${eval.value}); + | $refEvaluator.setArg($i, ${eval.value}); |} |""".stripMargin } @@ -104,7 +140,7 @@ private[hive] case class HiveSimpleUDF( |$resultType $resultTerm = null; |boolean ${ev.isNull} = false; |try { - | $resultTerm = ($resultType) $refTerm.helper().evaluate(); + | $resultTerm = ($resultType) $refEvaluator.evaluate(); | ${ev.isNull} = $resultTerm == null; |} catch (Throwable e) { | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( @@ -122,29 +158,19 @@ private[hive] case class HiveSimpleUDF( } } -class HiveSimpleUDFHelper( +class HiveSimpleUDFEvaluator( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveInspectors - with Serializable { + extends HiveUDFEvaluatorBase(funcWrapper: HiveFunctionWrapper, children) { - @transient - private[hive] val deterministic = isUDFDeterministic && children.forall(_.deterministic) - - @transient - private[hive] def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) - - @transient - private[hive] lazy val isUDFDeterministic = { - val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) - udfType != null && udfType.deterministic() && !udfType.stateful() - } + override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) - @transient - private[hive] val dataType = javaTypeToDataType(method.getGenericReturnType) + override def dataType: DataType = javaTypeToDataType(method.getGenericReturnType) @transient private lazy val function = funcWrapper.createFunction[UDF]() + override def functionClass: Class[_ <: AnyRef] = function.getClass + @transient private lazy val method = function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) @@ -159,23 +185,22 @@ class HiveSimpleUDFHelper( @transient private lazy val conversionHelper = new ConversionHelper(method, arguments) - @transient - private lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( - method.getGenericReturnType, ObjectInspectorOptions.JAVA)) - @transient private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length) - private[hive] def setArg(index: Int, arg: Any): Unit = { + override private[hive] def setArg(index: Int, arg: Any): Unit = { inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] } - private[hive] def evaluate(): Any = { - val ret = FunctionRegistry.invoke( + override def unwrapper: Any => Any = + unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( + method.getGenericReturnType, ObjectInspectorOptions.JAVA)) + + override def result: Any = { + FunctionRegistry.invoke( method, function, conversionHelper.convertIfNecessary(inputs: _*): _*) - unwrapper(ret) } } From 32fc25846db77b1da7baa07509270ea415f11780 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 15 Mar 2023 17:03:55 +0800 Subject: [PATCH 3/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 3a3b77521c382..19b4ae487a36b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -48,18 +48,25 @@ import org.apache.spark.sql.types._ private[hive] case class HiveSimpleUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression + with HiveInspectors with UserDefinedExpression { @transient private lazy val evaluator = new HiveSimpleUDFEvaluator(funcWrapper, children) - override lazy val deterministic: Boolean = evaluator.deterministic + @transient + private val isUDFDeterministic = { + val udfType = evaluator.function.getClass.getAnnotation(classOf[HiveUDFType]) + udfType != null && udfType.deterministic() && !udfType.stateful() + } + + override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) override def nullable: Boolean = true - override def foldable: Boolean = evaluator.foldable + override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) - override lazy val dataType = evaluator.dataType + override lazy val dataType: DataType = javaTypeToDataType(evaluator.method.getGenericReturnType) // TODO: Finish input output types. override def eval(input: InternalRow): Any = { @@ -82,41 +89,29 @@ private[hive] case class HiveSimpleUDF( copy(children = newChildren) protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - evaluator.doGenCode(ctx, ev) + evaluator.doGenCode(ctx, ev, dataType) } } -abstract class HiveUDFEvaluatorBase( +abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends HiveInspectors with Serializable { - def foldable: Boolean - - def dataType: DataType - - def functionClass: Class[_ <: AnyRef] - @transient - val isUDFDeterministic = { - val udfType = functionClass.getAnnotation(classOf[HiveUDFType]) - udfType != null && udfType.deterministic() && !udfType.stateful() - } + lazy val function = funcWrapper.createFunction[UDFType]() - @transient - val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) - - private[hive] def setArg(index: Int, arg: Any): Unit + def setArg(index: Int, arg: Any): Unit def unwrapper: Any => Any def result: Any - final private[hive] def evaluate(): Any = { + final def evaluate(): Any = { val ret = result unwrapper(ret) } - final def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + final def doGenCode(ctx: CodegenContext, ev: ExprCode, dataType: DataType): ExprCode = { val refEvaluator = ctx.addReferenceObj("evaluator", this) val evals = children.map(_.genCode(ctx)) @@ -160,20 +155,11 @@ abstract class HiveUDFEvaluatorBase( class HiveSimpleUDFEvaluator( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveUDFEvaluatorBase(funcWrapper: HiveFunctionWrapper, children) { - - override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) - - override def dataType: DataType = javaTypeToDataType(method.getGenericReturnType) - - @transient - private lazy val function = funcWrapper.createFunction[UDF]() - - override def functionClass: Class[_ <: AnyRef] = function.getClass + extends HiveUDFEvaluatorBase[UDF](funcWrapper, children) { @transient - private lazy val method = - function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) + lazy val method = function.getResolver. + getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) @transient private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray @@ -188,7 +174,7 @@ class HiveSimpleUDFEvaluator( @transient private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length) - override private[hive] def setArg(index: Int, arg: Any): Unit = { + override def setArg(index: Int, arg: Any): Unit = { inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] } From 30d4edf3f305d88ad478e611645d840460ce6f37 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 15 Mar 2023 20:24:20 +0800 Subject: [PATCH 4/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 65 +++---------------- 1 file changed, 9 insertions(+), 56 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index c94f946747ec4..75bf04e6b5be3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory} import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -102,14 +101,7 @@ abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( def setArg(index: Int, arg: Any): Unit - def unwrapper: Any => Any - - def result: Any - - final def evaluate(): Any = { - val ret = result - unwrapper(ret) - } + def evaluate(): Any final def doGenCode(ctx: CodegenContext, ev: ExprCode, dataType: DataType): ExprCode = { val refEvaluator = ctx.addReferenceObj("evaluator", this) @@ -178,15 +170,17 @@ class HiveSimpleUDFEvaluator( inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] } - override def unwrapper: Any => Any = + @transient + private lazy val unwrapper: Any => Any = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( method.getGenericReturnType, ObjectInspectorOptions.JAVA)) - override def result: Any = { - FunctionRegistry.invoke( + override def evaluate(): Any = { + val ret = FunctionRegistry.invoke( method, function, conversionHelper.convertIfNecessary(inputs: _*): _*) + unwrapper(ret) } } @@ -245,54 +239,13 @@ private[hive] case class HiveGenericUDF( copy(children = newChildren) protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) - val evals = children.map(_.genCode(ctx)) - - val setValues = evals.zipWithIndex.map { - case (eval, i) => - s""" - |if (${eval.isNull}) { - | $refEvaluator.setArg($i, null); - |} else { - | $refEvaluator.setArg($i, ${eval.value}); - |} - |""".stripMargin - } - - val resultType = CodeGenerator.boxedType(dataType) - val resultTerm = ctx.freshName("result") - ev.copy(code = - code""" - |${evals.map(_.code).mkString("\n")} - |${setValues.mkString("\n")} - |$resultType $resultTerm = null; - |boolean ${ev.isNull} = false; - |try { - | $resultTerm = ($resultType) $refEvaluator.evaluate(); - | ${ev.isNull} = $resultTerm == null; - |} catch (Throwable e) { - | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( - | "${funcWrapper.functionClassName}", - | "${children.map(_.dataType.catalogString).mkString(", ")}", - | "${dataType.catalogString}", - | e); - |} - |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; - |if (!${ev.isNull}) { - | ${ev.value} = $resultTerm; - |} - |""".stripMargin - ) + evaluator.doGenCode(ctx, ev, dataType) } } class HiveGenericUDFEvaluator( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveInspectors - with Serializable { - - @transient - lazy val function = funcWrapper.createFunction[GenericUDF]() + extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper, children) { @transient private lazy val argumentInspectors = children.map(toInspector) @@ -313,7 +266,7 @@ class HiveGenericUDFEvaluator( def setArg(index: Int, arg: Any): Unit = deferredObjects(index).asInstanceOf[DeferredObjectAdapter].set(arg) - def evaluate(): Any = unwrapper(function.evaluate(deferredObjects)) + override def evaluate(): Any = unwrapper(function.evaluate(deferredObjects)) } /** From 61ce1ba26cbd6e29df39657c8d29b0019f4228e9 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 15 Mar 2023 22:08:02 +0800 Subject: [PATCH 5/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../org/apache/spark/sql/hive/hiveUDFs.scala | 74 ++++++++++++++----- 1 file changed, 54 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 75bf04e6b5be3..1b5aed6ebef7f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -88,23 +88,7 @@ private[hive] case class HiveSimpleUDF( copy(children = newChildren) protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - evaluator.doGenCode(ctx, ev, dataType) - } -} - -abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( - funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveInspectors with Serializable { - - @transient - lazy val function = funcWrapper.createFunction[UDFType]() - - def setArg(index: Int, arg: Any): Unit - - def evaluate(): Any - - final def doGenCode(ctx: CodegenContext, ev: ExprCode, dataType: DataType): ExprCode = { - val refEvaluator = ctx.addReferenceObj("evaluator", this) + val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) val evals = children.map(_.genCode(ctx)) val setValues = evals.zipWithIndex.map { @@ -145,9 +129,22 @@ abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( } } +abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( + funcWrapper: HiveFunctionWrapper) + extends HiveInspectors with Serializable { + + @transient + lazy val function = funcWrapper.createFunction[UDFType]() + + def setArg(index: Int, arg: Any): Unit + + def evaluate(): Any + +} + class HiveSimpleUDFEvaluator( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveUDFEvaluatorBase[UDF](funcWrapper, children) { + extends HiveUDFEvaluatorBase[UDF](funcWrapper) { @transient lazy val method = function.getResolver. @@ -239,13 +236,50 @@ private[hive] case class HiveGenericUDF( copy(children = newChildren) protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { - evaluator.doGenCode(ctx, ev, dataType) + val refEvaluator = ctx.addReferenceObj("evaluator", evaluator) + val evals = children.map(_.genCode(ctx)) + + val setValues = evals.zipWithIndex.map { + case (eval, i) => + s""" + |if (${eval.isNull}) { + | $refEvaluator.setArg($i, null); + |} else { + | $refEvaluator.setArg($i, ${eval.value}); + |} + |""".stripMargin + } + + val resultType = CodeGenerator.boxedType(dataType) + val resultTerm = ctx.freshName("result") + ev.copy(code = + code""" + |${evals.map(_.code).mkString("\n")} + |${setValues.mkString("\n")} + |$resultType $resultTerm = null; + |boolean ${ev.isNull} = false; + |try { + | $resultTerm = ($resultType) $refEvaluator.evaluate(); + | ${ev.isNull} = $resultTerm == null; + |} catch (Throwable e) { + | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( + | "${funcWrapper.functionClassName}", + | "${children.map(_.dataType.catalogString).mkString(", ")}", + | "${dataType.catalogString}", + | e); + |} + |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; + |if (!${ev.isNull}) { + | ${ev.value} = $resultTerm; + |} + |""".stripMargin + ) } } class HiveGenericUDFEvaluator( funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper, children) { + extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper) { @transient private lazy val argumentInspectors = children.map(toInspector) From 218dd449a84514544e03e85e48211ef8384c8f0a Mon Sep 17 00:00:00 2001 From: panbingkun Date: Wed, 15 Mar 2023 22:22:38 +0800 Subject: [PATCH 6/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../main/scala/org/apache/spark/sql/hive/hiveUDFs.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index 1b5aed6ebef7f..b612f6aae20d1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -53,16 +53,16 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val evaluator = new HiveSimpleUDFEvaluator(funcWrapper, children) + override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + + override def nullable: Boolean = true + @transient private val isUDFDeterministic = { val udfType = evaluator.function.getClass.getAnnotation(classOf[HiveUDFType]) udfType != null && udfType.deterministic() && !udfType.stateful() } - override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) - - override def nullable: Boolean = true - override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) override lazy val dataType: DataType = javaTypeToDataType(evaluator.method.getGenericReturnType) From b422a8beb829b888156a5f86e766ce1c769600d4 Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 17 Mar 2023 19:37:10 +0800 Subject: [PATCH 7/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../spark/sql/hive/hiveUDFEvaluators.scala | 148 ++++++++++++++++++ .../org/apache/spark/sql/hive/hiveUDFs.scala | 147 ++--------------- 2 files changed, 158 insertions(+), 137 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala new file mode 100644 index 0000000000000..bed38449b75b0 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, UDF} +import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.types.DataType + +abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( + funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveInspectors with Serializable { + + @transient + lazy val function = funcWrapper.createFunction[UDFType]() + + @transient + val isUDFDeterministic = { + val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) + udfType != null && udfType.deterministic() && !udfType.stateful() + } + + def returnType: DataType + + def setArg(index: Int, arg: Any): Unit + + def doEvaluate(): Any + + final def evaluate(): Any = { + try { + doEvaluate() + } catch { + case e: Throwable => + throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( + s"${funcWrapper.functionClassName}", + s"${children.map(_.dataType.catalogString).mkString(", ")}", + s"${returnType.catalogString}", + e) + } + } +} + +class HiveSimpleUDFEvaluator( + funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveUDFEvaluatorBase[UDF](funcWrapper, children) { + + @transient + lazy val method = function.getResolver. + getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) + + @transient + private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + + @transient + private lazy val arguments = children.map(toInspector).toArray + + // Create parameter converters + @transient + private lazy val conversionHelper = new ConversionHelper(method, arguments) + + @transient + private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length) + + override def returnType: DataType = javaTypeToDataType(method.getGenericReturnType) + + override def setArg(index: Int, arg: Any): Unit = { + inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] + } + + @transient + private lazy val unwrapper: Any => Any = + unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( + method.getGenericReturnType, ObjectInspectorOptions.JAVA)) + + override def doEvaluate(): Any = { + val ret = FunctionRegistry.invoke( + method, + function, + conversionHelper.convertIfNecessary(inputs: _*): _*) + unwrapper(ret) + } +} + +class HiveGenericUDFEvaluator( + funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) + extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper, children) { + + @transient + private lazy val argumentInspectors = children.map(toInspector) + + @transient + lazy val returnInspector = { + function.initializeAndFoldConstants(argumentInspectors.toArray) + } + + @transient + private lazy val deferredObjects: Array[DeferredObject] = argumentInspectors.zip(children).map { + case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType) + }.toArray[DeferredObject] + + @transient + private lazy val unwrapper: Any => Any = unwrapperFor(returnInspector) + + override def returnType: DataType = inspectorToDataType(returnInspector) + + def setArg(index: Int, arg: Any): Unit = + deferredObjects(index).asInstanceOf[DeferredObjectAdapter].set(arg) + + override def doEvaluate(): Any = unwrapper(function.evaluate(deferredObjects)) +} + +// Adapter from Catalyst ExpressionResult to Hive DeferredObject +private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataType) + extends DeferredObject with HiveInspectors { + + private val wrapper = wrapperFor(oi, dataType) + private var func: Any = _ + def set(func: Any): Unit = { + this.func = func + } + override def prepare(i: Int): Unit = {} + override def get(): AnyRef = wrapper(func).asInstanceOf[AnyRef] +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala index b612f6aae20d1..b07a1b717e73e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala @@ -23,13 +23,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.hive.ql.exec._ -import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer -import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory} -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -53,25 +49,19 @@ private[hive] case class HiveSimpleUDF( @transient private lazy val evaluator = new HiveSimpleUDFEvaluator(funcWrapper, children) - override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic) + override lazy val deterministic: Boolean = + evaluator.isUDFDeterministic && children.forall(_.deterministic) override def nullable: Boolean = true - @transient - private val isUDFDeterministic = { - val udfType = evaluator.function.getClass.getAnnotation(classOf[HiveUDFType]) - udfType != null && udfType.deterministic() && !udfType.stateful() - } - - override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable) + override def foldable: Boolean = evaluator.isUDFDeterministic && children.forall(_.foldable) override lazy val dataType: DataType = javaTypeToDataType(evaluator.method.getGenericReturnType) // TODO: Finish input output types. override def eval(input: InternalRow): Any = { children.zipWithIndex.map { - case (child, idx) => - evaluator.setArg(idx, child.eval(input)) + case (child, idx) => evaluator.setArg(idx, child.eval(input)) } evaluator.evaluate() } @@ -108,18 +98,8 @@ private[hive] case class HiveSimpleUDF( code""" |${evals.map(_.code).mkString("\n")} |${setValues.mkString("\n")} - |$resultType $resultTerm = null; - |boolean ${ev.isNull} = false; - |try { - | $resultTerm = ($resultType) $refEvaluator.evaluate(); - | ${ev.isNull} = $resultTerm == null; - |} catch (Throwable e) { - | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( - | "${funcWrapper.functionClassName}", - | "${children.map(_.dataType.catalogString).mkString(", ")}", - | "${dataType.catalogString}", - | e); - |} + |$resultType $resultTerm = ($resultType) $refEvaluator.evaluate(); + |boolean ${ev.isNull} = $resultTerm == null; |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; |if (!${ev.isNull}) { | ${ev.value} = $resultTerm; @@ -129,71 +109,6 @@ private[hive] case class HiveSimpleUDF( } } -abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( - funcWrapper: HiveFunctionWrapper) - extends HiveInspectors with Serializable { - - @transient - lazy val function = funcWrapper.createFunction[UDFType]() - - def setArg(index: Int, arg: Any): Unit - - def evaluate(): Any - -} - -class HiveSimpleUDFEvaluator( - funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveUDFEvaluatorBase[UDF](funcWrapper) { - - @transient - lazy val method = function.getResolver. - getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) - - @transient - private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray - - @transient - private lazy val arguments = children.map(toInspector).toArray - - // Create parameter converters - @transient - private lazy val conversionHelper = new ConversionHelper(method, arguments) - - @transient - private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length) - - override def setArg(index: Int, arg: Any): Unit = { - inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef] - } - - @transient - private lazy val unwrapper: Any => Any = - unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector( - method.getGenericReturnType, ObjectInspectorOptions.JAVA)) - - override def evaluate(): Any = { - val ret = FunctionRegistry.invoke( - method, - function, - conversionHelper.convertIfNecessary(inputs: _*): _*) - unwrapper(ret) - } -} - -// Adapter from Catalyst ExpressionResult to Hive DeferredObject -private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataType) - extends DeferredObject with HiveInspectors { - - private val wrapper = wrapperFor(oi, dataType) - private var func: Any = _ - def set(func: Any): Unit = { - this.func = func - } - override def prepare(i: Int): Unit = {} - override def get(): AnyRef = wrapper(func).asInstanceOf[AnyRef] -} - private[hive] case class HiveGenericUDF( name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) extends Expression @@ -203,9 +118,9 @@ private[hive] case class HiveGenericUDF( override def nullable: Boolean = true override lazy val deterministic: Boolean = - isUDFDeterministic && children.forall(_.deterministic) + evaluator.isUDFDeterministic && children.forall(_.deterministic) - override def foldable: Boolean = isUDFDeterministic && + override def foldable: Boolean = evaluator.isUDFDeterministic && evaluator.returnInspector.isInstanceOf[ConstantObjectInspector] override lazy val dataType: DataType = inspectorToDataType(evaluator.returnInspector) @@ -213,12 +128,6 @@ private[hive] case class HiveGenericUDF( @transient private lazy val evaluator = new HiveGenericUDFEvaluator(funcWrapper, children) - @transient - private val isUDFDeterministic = { - val udfType = evaluator.function.getClass.getAnnotation(classOf[HiveUDFType]) - udfType != null && udfType.deterministic() && !udfType.stateful() - } - override def eval(input: InternalRow): Any = { children.zipWithIndex.map { case (child, idx) => evaluator.setArg(idx, child.eval(input)) @@ -256,18 +165,8 @@ private[hive] case class HiveGenericUDF( code""" |${evals.map(_.code).mkString("\n")} |${setValues.mkString("\n")} - |$resultType $resultTerm = null; - |boolean ${ev.isNull} = false; - |try { - | $resultTerm = ($resultType) $refEvaluator.evaluate(); - | ${ev.isNull} = $resultTerm == null; - |} catch (Throwable e) { - | throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError( - | "${funcWrapper.functionClassName}", - | "${children.map(_.dataType.catalogString).mkString(", ")}", - | "${dataType.catalogString}", - | e); - |} + |$resultType $resultTerm = ($resultType) $refEvaluator.evaluate(); + |boolean ${ev.isNull} = $resultTerm == null; |${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)}; |if (!${ev.isNull}) { | ${ev.value} = $resultTerm; @@ -277,32 +176,6 @@ private[hive] case class HiveGenericUDF( } } -class HiveGenericUDFEvaluator( - funcWrapper: HiveFunctionWrapper, children: Seq[Expression]) - extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper) { - - @transient - private lazy val argumentInspectors = children.map(toInspector) - - @transient - lazy val returnInspector = { - function.initializeAndFoldConstants(argumentInspectors.toArray) - } - - @transient - private lazy val deferredObjects: Array[DeferredObject] = argumentInspectors.zip(children).map { - case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType) - }.toArray[DeferredObject] - - @transient - private lazy val unwrapper: Any => Any = unwrapperFor(returnInspector) - - def setArg(index: Int, arg: Any): Unit = - deferredObjects(index).asInstanceOf[DeferredObjectAdapter].set(arg) - - override def evaluate(): Any = unwrapper(function.evaluate(deferredObjects)) -} - /** * Converts a Hive Generic User Defined Table Generating Function (UDTF) to a * `Generator`. Note that the semantics of Generators do not allow From a463d74074302fd543e3fa1a7e0dc701f8dc99cf Mon Sep 17 00:00:00 2001 From: panbingkun Date: Fri, 17 Mar 2023 22:33:20 +0800 Subject: [PATCH 8/8] [SPARK-42052][SQL] Codegen Support for HiveSimpleUDF --- .../scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala index bed38449b75b0..094f8ba7a0f89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFEvaluators.scala @@ -40,7 +40,7 @@ abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef]( lazy val function = funcWrapper.createFunction[UDFType]() @transient - val isUDFDeterministic = { + lazy val isUDFDeterministic = { val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) udfType != null && udfType.deterministic() && !udfType.stateful() }