From c4f68e03c3d3d8d7766a04dd9eb4b366b896f026 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 12 Dec 2024 10:28:31 +0800 Subject: [PATCH] 1111 --- .../paimon/spark/SparkGenericCatalog.java | 14 +-- .../logical/PaimonTableValuedFunctions.scala | 14 +-- .../spark/sql/TableValuedFunctionsTest.scala | 97 +++++++++++-------- 3 files changed, 65 insertions(+), 60 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java index 9957f0cdf91f..915e39c432de 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java @@ -288,12 +288,6 @@ private CaseInsensitiveStringMap autoFillConfigurations( Map newOptions = new HashMap<>(options.asCaseSensitiveMap()); fillAliyunConfigurations(newOptions, hadoopConf); fillCommonConfigurations(newOptions, sqlConf); - - // if spark is case-insensitive, set allow upper case to catalog - if (!sqlConf.caseSensitiveAnalysis()) { - newOptions.put(ALLOW_UPPER_CASE.key(), "true"); - } - return new CaseInsensitiveStringMap(newOptions); } @@ -313,13 +307,16 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo String warehouse = sqlConf.warehousePath(); options.put(WAREHOUSE.key(), warehouse); } + if (!options.containsKey(METASTORE.key())) { String metastore = sqlConf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION()); if (HiveCatalogOptions.IDENTIFIER.equals(metastore)) { options.put(METASTORE.key(), metastore); } } + options.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false"); + String sessionCatalogDefaultDatabase = SQLConfUtils.defaultDatabase(sqlConf); if (options.containsKey(DEFAULT_DATABASE.key())) { String userDefineDefaultDatabase = options.get(DEFAULT_DATABASE.key()); @@ -333,6 +330,11 @@ private void fillCommonConfigurations(Map options, SQLConf sqlCo } else { options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase); } + + // if spark is case-insensitive, set allow upper case to catalog + if (!sqlConf.caseSensitiveAnalysis()) { + options.put(ALLOW_UPPER_CASE.key(), "true"); + } } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala index 4d63c2a8d2be..6edbf533cbbc 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala @@ -19,8 +19,6 @@ package org.apache.paimon.spark.catalyst.plans.logical import org.apache.paimon.CoreOptions -import org.apache.paimon.spark.SparkCatalog -import org.apache.paimon.spark.catalog.Catalogs import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -28,7 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase import org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} -import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -40,7 +38,7 @@ object PaimonTableValuedFunctions { val supportedFnNames: Seq[String] = Seq(INCREMENTAL_QUERY) - type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) + private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder) def getTableValueFunctionInjection(fnName: String): TableFunctionDescription = { val (info, builder) = fnName match { @@ -60,13 +58,7 @@ object PaimonTableValuedFunctions { val sessionState = spark.sessionState val catalogManager = sessionState.catalogManager - - val sparkCatalog = new SparkCatalog() - val currentCatalog = catalogManager.currentCatalog.name() - sparkCatalog.initialize( - currentCatalog, - Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf)) - + val sparkCatalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog] val tableId = sessionState.sqlParser.parseTableIdentifier(args.head.eval().toString) val namespace = tableId.database.map(Array(_)).getOrElse(catalogManager.currentNamespace) val ident = Identifier.of(namespace, tableId.table) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala index 2a689b631acd..b9c187b83a25 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/TableValuedFunctionsTest.scala @@ -18,64 +18,75 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.spark.sql.{DataFrame, Row} -class TableValuedFunctionsTest extends PaimonSparkTestBase { +class TableValuedFunctionsTest extends PaimonHiveTestBase { withPk.foreach { hasPk => bucketModes.foreach { bucket => test(s"incremental query: hasPk: $hasPk, bucket: $bucket") { - val prop = if (hasPk) { - s"'primary-key'='a,b', 'bucket' = '$bucket' " - } else if (bucket != -1) { - s"'bucket-key'='b', 'bucket' = '$bucket' " - } else { - "'write-only'='true'" - } + Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach { + catalogName => + sql(s"use $catalogName") + + withTable("t") { + val prop = if (hasPk) { + s"'primary-key'='a,b', 'bucket' = '$bucket' " + } else if (bucket != -1) { + s"'bucket-key'='b', 'bucket' = '$bucket' " + } else { + "'write-only'='true'" + } - spark.sql(s""" - |CREATE TABLE T (a INT, b INT, c STRING) - |USING paimon - |TBLPROPERTIES ($prop) - |PARTITIONED BY (a) - |""".stripMargin) + spark.sql(s""" + |CREATE TABLE t (a INT, b INT, c STRING) + |USING paimon + |TBLPROPERTIES ($prop) + |PARTITIONED BY (a) + |""".stripMargin) - spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')") - spark.sql("INSERT INTO T VALUES (1, 3, '3'), (2, 4, '4')") - spark.sql("INSERT INTO T VALUES (1, 5, '5'), (1, 7, '7')") + spark.sql("INSERT INTO t values (1, 1, '1'), (2, 2, '2')") + spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')") + spark.sql("INSERT INTO t VALUES (1, 5, '5'), (1, 7, '7')") - checkAnswer( - incrementalDF("T", 0, 1).orderBy("a", "b"), - Row(1, 1, "1") :: Row(2, 2, "2") :: Nil) - checkAnswer( - spark.sql("SELECT * FROM paimon_incremental_query('T', '0', '1') ORDER BY a, b"), - Row(1, 1, "1") :: Row(2, 2, "2") :: Nil) + checkAnswer( + incrementalDF("t", 0, 1).orderBy("a", "b"), + Row(1, 1, "1") :: Row(2, 2, "2") :: Nil) + checkAnswer( + spark.sql( + "SELECT * FROM paimon_incremental_query('t', '0', '1') ORDER BY a, b"), + Row(1, 1, "1") :: Row(2, 2, "2") :: Nil) - checkAnswer( - incrementalDF("T", 1, 2).orderBy("a", "b"), - Row(1, 3, "3") :: Row(2, 4, "4") :: Nil) - checkAnswer( - spark.sql("SELECT * FROM paimon_incremental_query('T', '1', '2') ORDER BY a, b"), - Row(1, 3, "3") :: Row(2, 4, "4") :: Nil) + checkAnswer( + incrementalDF("t", 1, 2).orderBy("a", "b"), + Row(1, 3, "3") :: Row(2, 4, "4") :: Nil) + checkAnswer( + spark.sql( + "SELECT * FROM paimon_incremental_query('t', '1', '2') ORDER BY a, b"), + Row(1, 3, "3") :: Row(2, 4, "4") :: Nil) - checkAnswer( - incrementalDF("T", 2, 3).orderBy("a", "b"), - Row(1, 5, "5") :: Row(1, 7, "7") :: Nil) - checkAnswer( - spark.sql("SELECT * FROM paimon_incremental_query('T', '2', '3') ORDER BY a, b"), - Row(1, 5, "5") :: Row(1, 7, "7") :: Nil) + checkAnswer( + incrementalDF("t", 2, 3).orderBy("a", "b"), + Row(1, 5, "5") :: Row(1, 7, "7") :: Nil) + checkAnswer( + spark.sql( + "SELECT * FROM paimon_incremental_query('t', '2', '3') ORDER BY a, b"), + Row(1, 5, "5") :: Row(1, 7, "7") :: Nil) - checkAnswer( - incrementalDF("T", 1, 3).orderBy("a", "b"), - Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil - ) - checkAnswer( - spark.sql("SELECT * FROM paimon_incremental_query('T', '1', '3') ORDER BY a, b"), - Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil) + checkAnswer( + incrementalDF("t", 1, 3).orderBy("a", "b"), + Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil + ) + checkAnswer( + spark.sql( + "SELECT * FROM paimon_incremental_query('t', '1', '3') ORDER BY a, b"), + Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil) + } + } } } }