Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;

import java.util.Arrays;

import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;

/** Catalog methods for working with Functions. */
Expand All @@ -54,7 +52,7 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc
return new Identifier[0];
}

throw new RuntimeException("Namespace " + Arrays.toString(namespace) + " is not valid");
throw new NoSuchNamespaceException(namespace);
}

@Override
Expand All @@ -66,6 +64,6 @@ default UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExce
}
}

throw new RuntimeException("Function " + ident + " is not a paimon function");
throw new NoSuchFunctionException(ident);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.apache.paimon.spark.catalog.functions.PaimonFunctions

import org.apache.spark.sql.Row
import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}

class PaimonFunctionTest extends PaimonHiveTestBase {

Expand Down Expand Up @@ -85,4 +87,49 @@ class PaimonFunctionTest extends PaimonHiveTestBase {
)
}
}

test("Paimon function: show and load function with SparkGenericCatalog") {
sql(s"USE $sparkCatalogName")
sql(s"USE $hiveDbName")
sql("CREATE FUNCTION myIntSum AS 'org.apache.paimon.spark.sql.MyIntSum'")
checkAnswer(
sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"),
Row("spark_catalog.test_hive.myintsum"))

withTable("t") {
sql("CREATE TABLE t (id INT)")
sql("INSERT INTO t VALUES (1), (2), (3)")
checkAnswer(sql("SELECT myIntSum(id) FROM t"), Row(6))
}

sql("DROP FUNCTION myIntSum")
checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), Seq.empty)
}
}

private class MyIntSum extends UserDefinedAggregateFunction {

override def inputSchema: StructType = new StructType().add("input", IntegerType)

override def bufferSchema: StructType = new StructType().add("buffer", IntegerType)

override def dataType: DataType = IntegerType

override def deterministic: Boolean = true

override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer.update(0, 0)
}

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer.update(0, buffer.getInt(0) + input.getInt(0))
}

override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0))
}

override def evaluate(buffer: Row): Any = {
buffer.getInt(0)
}
}
Loading