Skip to content
Closed
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive

import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, UDF}
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.types.DataType

abstract class HiveUDFEvaluatorBase[UDFType <: AnyRef](
funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends HiveInspectors with Serializable {

@transient
lazy val function = funcWrapper.createFunction[UDFType]()

@transient
lazy val isUDFDeterministic = {
val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
udfType != null && udfType.deterministic() && !udfType.stateful()
}

def returnType: DataType
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to add this method here because it will be used in exception handling.


def setArg(index: Int, arg: Any): Unit

def doEvaluate(): Any

final def evaluate(): Any = {
try {
doEvaluate()
} catch {
case e: Throwable =>
throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
s"${funcWrapper.functionClassName}",
s"${children.map(_.dataType.catalogString).mkString(", ")}",
s"${returnType.catalogString}",
e)
}
}
}

class HiveSimpleUDFEvaluator(
funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends HiveUDFEvaluatorBase[UDF](funcWrapper, children) {

@transient
lazy val method = function.getResolver.
getEvalMethod(children.map(_.dataType.toTypeInfo).asJava)

@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray

@transient
private lazy val arguments = children.map(toInspector).toArray

// Create parameter converters
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)

@transient
private lazy val inputs: Array[AnyRef] = new Array[AnyRef](children.length)

override def returnType: DataType = javaTypeToDataType(method.getGenericReturnType)

override def setArg(index: Int, arg: Any): Unit = {
inputs(index) = wrappers(index)(arg).asInstanceOf[AnyRef]
}

@transient
private lazy val unwrapper: Any => Any =
unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector(
method.getGenericReturnType, ObjectInspectorOptions.JAVA))

override def doEvaluate(): Any = {
val ret = FunctionRegistry.invoke(
method,
function,
conversionHelper.convertIfNecessary(inputs: _*): _*)
unwrapper(ret)
}
}

class HiveGenericUDFEvaluator(
funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends HiveUDFEvaluatorBase[GenericUDF](funcWrapper, children) {

@transient
private lazy val argumentInspectors = children.map(toInspector)

@transient
lazy val returnInspector = {
function.initializeAndFoldConstants(argumentInspectors.toArray)
}

@transient
private lazy val deferredObjects: Array[DeferredObject] = argumentInspectors.zip(children).map {
case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType)
}.toArray[DeferredObject]

@transient
private lazy val unwrapper: Any => Any = unwrapperFor(returnInspector)

override def returnType: DataType = inspectorToDataType(returnInspector)

def setArg(index: Int, arg: Any): Unit =
deferredObjects(index).asInstanceOf[DeferredObjectAdapter].set(arg)

override def doEvaluate(): Any = unwrapper(function.evaluate(deferredObjects))
}

// Adapter from Catalyst ExpressionResult to Hive DeferredObject
private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataType)
extends DeferredObject with HiveInspectors {

private val wrapper = wrapperFor(oi, dataType)
private var func: Any = _
def set(func: Any): Unit = {
this.func = func
}
override def prepare(i: Int): Unit = {}
override def get(): AnyRef = wrapper(func).asInstanceOf[AnyRef]
}
147 changes: 42 additions & 105 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.ql.exec._
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
import org.apache.hadoop.hive.ql.udf.generic._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspector, ObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
Expand All @@ -49,56 +44,26 @@ private[hive] case class HiveSimpleUDF(
name: String, funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends Expression
with HiveInspectors
with CodegenFallback
with Logging
with UserDefinedExpression {

override lazy val deterministic: Boolean = isUDFDeterministic && children.forall(_.deterministic)

override def nullable: Boolean = true

@transient
lazy val function = funcWrapper.createFunction[UDF]()

@transient
private lazy val method =
function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava)

@transient
private lazy val arguments = children.map(toInspector).toArray

@transient
private lazy val isUDFDeterministic = {
val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
udfType != null && udfType.deterministic() && !udfType.stateful()
}
private lazy val evaluator = new HiveSimpleUDFEvaluator(funcWrapper, children)

override def foldable: Boolean = isUDFDeterministic && children.forall(_.foldable)
override lazy val deterministic: Boolean =
evaluator.isUDFDeterministic && children.forall(_.deterministic)

// Create parameter converters
@transient
private lazy val conversionHelper = new ConversionHelper(method, arguments)
override def nullable: Boolean = true

override lazy val dataType = javaTypeToDataType(method.getGenericReturnType)
override def foldable: Boolean = evaluator.isUDFDeterministic && children.forall(_.foldable)

@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray

@transient
lazy val unwrapper = unwrapperFor(ObjectInspectorFactory.getReflectionObjectInspector(
method.getGenericReturnType, ObjectInspectorOptions.JAVA))

@transient
private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)
override lazy val dataType: DataType = javaTypeToDataType(evaluator.method.getGenericReturnType)

// TODO: Finish input output types.
override def eval(input: InternalRow): Any = {
val inputs = wrap(children.map(_.eval(input)), wrappers, cached)
val ret = FunctionRegistry.invoke(
method,
function,
conversionHelper.convertIfNecessary(inputs : _*): _*)
unwrapper(ret)
children.zipWithIndex.map {
case (child, idx) => evaluator.setArg(idx, child.eval(input))
}
evaluator.evaluate()
}

override def toString: String = {
Expand All @@ -111,19 +76,37 @@ private[hive] case class HiveSimpleUDF(

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression =
copy(children = newChildren)
}

// Adapter from Catalyst ExpressionResult to Hive DeferredObject
private[hive] class DeferredObjectAdapter(oi: ObjectInspector, dataType: DataType)
extends DeferredObject with HiveInspectors {
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val refEvaluator = ctx.addReferenceObj("evaluator", evaluator)
val evals = children.map(_.genCode(ctx))

val setValues = evals.zipWithIndex.map {
case (eval, i) =>
s"""
|if (${eval.isNull}) {
| $refEvaluator.setArg($i, null);
|} else {
| $refEvaluator.setArg($i, ${eval.value});
|}
|""".stripMargin
}

private val wrapper = wrapperFor(oi, dataType)
private var func: Any = _
def set(func: Any): Unit = {
this.func = func
val resultType = CodeGenerator.boxedType(dataType)
val resultTerm = ctx.freshName("result")
ev.copy(code =
code"""
|${evals.map(_.code).mkString("\n")}
|${setValues.mkString("\n")}
|$resultType $resultTerm = ($resultType) $refEvaluator.evaluate();
|boolean ${ev.isNull} = $resultTerm == null;
|${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
|if (!${ev.isNull}) {
| ${ev.value} = $resultTerm;
|}
|""".stripMargin
)
}
override def prepare(i: Int): Unit = {}
override def get(): AnyRef = wrapper(func).asInstanceOf[AnyRef]
}

private[hive] case class HiveGenericUDF(
Expand All @@ -135,22 +118,16 @@ private[hive] case class HiveGenericUDF(
override def nullable: Boolean = true

override lazy val deterministic: Boolean =
isUDFDeterministic && children.forall(_.deterministic)
evaluator.isUDFDeterministic && children.forall(_.deterministic)

override def foldable: Boolean = isUDFDeterministic &&
override def foldable: Boolean = evaluator.isUDFDeterministic &&
evaluator.returnInspector.isInstanceOf[ConstantObjectInspector]

override lazy val dataType: DataType = inspectorToDataType(evaluator.returnInspector)

@transient
private lazy val evaluator = new HiveGenericUDFEvaluator(funcWrapper, children)

@transient
private val isUDFDeterministic = {
val udfType = evaluator.function.getClass.getAnnotation(classOf[HiveUDFType])
udfType != null && udfType.deterministic() && !udfType.stateful()
}

override def eval(input: InternalRow): Any = {
children.zipWithIndex.map {
case (child, idx) => evaluator.setArg(idx, child.eval(input))
Expand Down Expand Up @@ -188,18 +165,8 @@ private[hive] case class HiveGenericUDF(
code"""
|${evals.map(_.code).mkString("\n")}
|${setValues.mkString("\n")}
|$resultType $resultTerm = null;
|boolean ${ev.isNull} = false;
|try {
| $resultTerm = ($resultType) $refEvaluator.evaluate();
| ${ev.isNull} = $resultTerm == null;
|} catch (Throwable e) {
| throw QueryExecutionErrors.failedExecuteUserDefinedFunctionError(
| "${funcWrapper.functionClassName}",
| "${children.map(_.dataType.catalogString).mkString(", ")}",
| "${dataType.catalogString}",
| e);
|}
|$resultType $resultTerm = ($resultType) $refEvaluator.evaluate();
|boolean ${ev.isNull} = $resultTerm == null;
|${CodeGenerator.javaType(dataType)} ${ev.value} = ${CodeGenerator.defaultValue(dataType)};
|if (!${ev.isNull}) {
| ${ev.value} = $resultTerm;
Expand All @@ -209,36 +176,6 @@ private[hive] case class HiveGenericUDF(
}
}

class HiveGenericUDFEvaluator(
funcWrapper: HiveFunctionWrapper, children: Seq[Expression])
extends HiveInspectors
with Serializable {

@transient
lazy val function = funcWrapper.createFunction[GenericUDF]()

@transient
private lazy val argumentInspectors = children.map(toInspector)

@transient
lazy val returnInspector = {
function.initializeAndFoldConstants(argumentInspectors.toArray)
}

@transient
private lazy val deferredObjects: Array[DeferredObject] = argumentInspectors.zip(children).map {
case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType)
}.toArray[DeferredObject]

@transient
private lazy val unwrapper: Any => Any = unwrapperFor(returnInspector)

def setArg(index: Int, arg: Any): Unit =
deferredObjects(index).asInstanceOf[DeferredObjectAdapter].set(arg)

def evaluate(): Any = unwrapper(function.evaluate(deferredObjects))
}

/**
* Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
* `Generator`. Note that the semantics of Generators do not allow
Expand Down
Loading