From b3455c4fd3fafcd3acc86373ac00bc0a522608e4 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 19 Dec 2024 16:37:25 +0800 Subject: [PATCH 1/4] 1 --- .../org/apache/paimon/spark/catalog/SupportFunction.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java index 772a2f4ed53d..91a6d7b4a2e6 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java @@ -29,6 +29,8 @@ import java.util.Arrays; +import scala.Option; + import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; /** Catalog methods for working with Functions. */ @@ -54,7 +56,8 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc return new Identifier[0]; } - throw new RuntimeException("Namespace " + Arrays.toString(namespace) + " is not valid"); + throw new NoSuchNamespaceException( + "Namespace " + Arrays.toString(namespace) + " is not valid", Option.empty()); } @Override @@ -66,6 +69,7 @@ default UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExce } } - throw new RuntimeException("Function " + ident + " is not a paimon function"); + throw new NoSuchFunctionException( + "Function " + ident + " is not a paimon function", Option.empty()); } } From 19b59ee376a0dd29922b61a1086c8a290d7855f4 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 19 Dec 2024 16:50:46 +0800 Subject: [PATCH 2/4] 1 --- .../apache/paimon/spark/catalog/SupportFunction.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java index 91a6d7b4a2e6..bb16bbf1c7bc 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/catalog/SupportFunction.java @@ -27,10 +27,6 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; -import java.util.Arrays; - -import scala.Option; - import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME; /** Catalog methods for working with Functions. */ @@ -56,8 +52,7 @@ default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceExc return new Identifier[0]; } - throw new NoSuchNamespaceException( - "Namespace " + Arrays.toString(namespace) + " is not valid", Option.empty()); + throw new NoSuchNamespaceException(namespace); } @Override @@ -69,7 +64,6 @@ default UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionExce } } - throw new NoSuchFunctionException( - "Function " + ident + " is not a paimon function", Option.empty()); + throw new NoSuchFunctionException(ident); } } From 7fa4f5f1edf3e960eb0d43cefa9d61d8f723c417 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 19 Dec 2024 22:17:18 +0800 Subject: [PATCH 3/4] add test --- .../paimon/spark/sql/PaimonFunctionTest.scala | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala index 6b4543661fa2..1dac3784c75d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala @@ -23,6 +23,8 @@ import org.apache.paimon.spark.catalog.functions.PaimonFunctions import org.apache.spark.sql.Row import org.apache.spark.sql.connector.catalog.{FunctionCatalog, Identifier} +import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} +import org.apache.spark.sql.types.{DataType, IntegerType, StructType} class PaimonFunctionTest extends PaimonHiveTestBase { @@ -85,4 +87,48 @@ class PaimonFunctionTest extends PaimonHiveTestBase { ) } } + + test("Paimon function: show and load function with SparkGenericCatalog") { + sql(s"USE $sparkCatalogName") + sql(s"USE $hiveDbName") + sql("CREATE FUNCTION myIntSum AS 'org.apache.paimon.spark.sql.MyIntSum'") + checkAnswer( + sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), + Row("spark_catalog.test_hive.myintsum")) + + withTable("t") { + sql("CREATE TABLE t (id INT)") + sql("INSERT INTO t VALUES (1), (2), (3)") + checkAnswer(sql("SELECT myIntSum(id) FROM t"), Row(6)) + } + + sql("DROP FUNCTION myIntSum") + checkAnswer(sql(s"SHOW FUNCTIONS FROM $hiveDbName LIKE 'myIntSum'"), Seq.empty) + } +} + +private class MyIntSum extends UserDefinedAggregateFunction { + override def inputSchema: StructType = new StructType().add("input", IntegerType) + + override def bufferSchema: StructType = new StructType().add("buffer", IntegerType) + + override def dataType: DataType = IntegerType + + override def deterministic: Boolean = true + + override def initialize(buffer: MutableAggregationBuffer): Unit = { + buffer.update(0, 0) + } + + override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { + buffer.update(0, input.getInt(0) + buffer.getInt(0)) + } + + override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { + buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0)) + } + + override def evaluate(buffer: Row): Any = { + buffer.getInt(0) + } } From e911dd7c729642afbde30fe420dfb704d7c834b7 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 19 Dec 2024 22:31:25 +0800 Subject: [PATCH 4/4] 1 --- .../scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala index 1dac3784c75d..f399ca3e6f01 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonFunctionTest.scala @@ -108,6 +108,7 @@ class PaimonFunctionTest extends PaimonHiveTestBase { } private class MyIntSum extends UserDefinedAggregateFunction { + override def inputSchema: StructType = new StructType().add("input", IntegerType) override def bufferSchema: StructType = new StructType().add("buffer", IntegerType) @@ -121,7 +122,7 @@ private class MyIntSum extends UserDefinedAggregateFunction { } override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { - buffer.update(0, input.getInt(0) + buffer.getInt(0)) + buffer.update(0, buffer.getInt(0) + input.getInt(0)) } override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {