Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,6 @@ private CaseInsensitiveStringMap autoFillConfigurations(
Map<String, String> newOptions = new HashMap<>(options.asCaseSensitiveMap());
fillAliyunConfigurations(newOptions, hadoopConf);
fillCommonConfigurations(newOptions, sqlConf);

// if spark is case-insensitive, set allow upper case to catalog
if (!sqlConf.caseSensitiveAnalysis()) {
newOptions.put(ALLOW_UPPER_CASE.key(), "true");
}

return new CaseInsensitiveStringMap(newOptions);
}

Expand All @@ -313,13 +307,16 @@ private void fillCommonConfigurations(Map<String, String> options, SQLConf sqlCo
String warehouse = sqlConf.warehousePath();
options.put(WAREHOUSE.key(), warehouse);
}

if (!options.containsKey(METASTORE.key())) {
String metastore = sqlConf.getConf(StaticSQLConf.CATALOG_IMPLEMENTATION());
if (HiveCatalogOptions.IDENTIFIER.equals(metastore)) {
options.put(METASTORE.key(), metastore);
}
}

options.put(CatalogOptions.FORMAT_TABLE_ENABLED.key(), "false");

String sessionCatalogDefaultDatabase = SQLConfUtils.defaultDatabase(sqlConf);
if (options.containsKey(DEFAULT_DATABASE.key())) {
String userDefineDefaultDatabase = options.get(DEFAULT_DATABASE.key());
Expand All @@ -333,6 +330,11 @@ private void fillCommonConfigurations(Map<String, String> options, SQLConf sqlCo
} else {
options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase);
}

// if spark is case-insensitive, set allow upper case to catalog
if (!sqlConf.caseSensitiveAnalysis()) {
options.put(ALLOW_UPPER_CASE.key(), "true");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@
package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.SparkCatalog
import org.apache.paimon.spark.catalog.Catalogs

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistryBase
import org.apache.spark.sql.catalyst.analysis.TableFunctionRegistry.TableFunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

Expand All @@ -40,7 +38,7 @@ object PaimonTableValuedFunctions {

val supportedFnNames: Seq[String] = Seq(INCREMENTAL_QUERY)

type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)
private type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, TableFunctionBuilder)

def getTableValueFunctionInjection(fnName: String): TableFunctionDescription = {
val (info, builder) = fnName match {
Expand All @@ -60,13 +58,7 @@ object PaimonTableValuedFunctions {

val sessionState = spark.sessionState
val catalogManager = sessionState.catalogManager

val sparkCatalog = new SparkCatalog()
val currentCatalog = catalogManager.currentCatalog.name()
sparkCatalog.initialize(
currentCatalog,
Catalogs.catalogOptions(currentCatalog, spark.sessionState.conf))

val sparkCatalog = catalogManager.currentCatalog.asInstanceOf[TableCatalog]
val tableId = sessionState.sqlParser.parseTableIdentifier(args.head.eval().toString)
val namespace = tableId.database.map(Array(_)).getOrElse(catalogManager.currentNamespace)
val ident = Identifier.of(namespace, tableId.table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,64 +18,75 @@

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.PaimonHiveTestBase

import org.apache.spark.sql.{DataFrame, Row}

class TableValuedFunctionsTest extends PaimonSparkTestBase {
class TableValuedFunctionsTest extends PaimonHiveTestBase {

withPk.foreach {
hasPk =>
bucketModes.foreach {
bucket =>
test(s"incremental query: hasPk: $hasPk, bucket: $bucket") {
val prop = if (hasPk) {
s"'primary-key'='a,b', 'bucket' = '$bucket' "
} else if (bucket != -1) {
s"'bucket-key'='b', 'bucket' = '$bucket' "
} else {
"'write-only'='true'"
}
Seq("paimon", sparkCatalogName, paimonHiveCatalogName).foreach {
catalogName =>
sql(s"use $catalogName")

withTable("t") {
val prop = if (hasPk) {
s"'primary-key'='a,b', 'bucket' = '$bucket' "
} else if (bucket != -1) {
s"'bucket-key'='b', 'bucket' = '$bucket' "
} else {
"'write-only'='true'"
}

spark.sql(s"""
|CREATE TABLE T (a INT, b INT, c STRING)
|USING paimon
|TBLPROPERTIES ($prop)
|PARTITIONED BY (a)
|""".stripMargin)
spark.sql(s"""
|CREATE TABLE t (a INT, b INT, c STRING)
|USING paimon
|TBLPROPERTIES ($prop)
|PARTITIONED BY (a)
|""".stripMargin)

spark.sql("INSERT INTO T values (1, 1, '1'), (2, 2, '2')")
spark.sql("INSERT INTO T VALUES (1, 3, '3'), (2, 4, '4')")
spark.sql("INSERT INTO T VALUES (1, 5, '5'), (1, 7, '7')")
spark.sql("INSERT INTO t values (1, 1, '1'), (2, 2, '2')")
spark.sql("INSERT INTO t VALUES (1, 3, '3'), (2, 4, '4')")
spark.sql("INSERT INTO t VALUES (1, 5, '5'), (1, 7, '7')")

checkAnswer(
incrementalDF("T", 0, 1).orderBy("a", "b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
checkAnswer(
spark.sql("SELECT * FROM paimon_incremental_query('T', '0', '1') ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
checkAnswer(
incrementalDF("t", 0, 1).orderBy("a", "b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)
checkAnswer(
spark.sql(
"SELECT * FROM paimon_incremental_query('t', '0', '1') ORDER BY a, b"),
Row(1, 1, "1") :: Row(2, 2, "2") :: Nil)

checkAnswer(
incrementalDF("T", 1, 2).orderBy("a", "b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
checkAnswer(
spark.sql("SELECT * FROM paimon_incremental_query('T', '1', '2') ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
checkAnswer(
incrementalDF("t", 1, 2).orderBy("a", "b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)
checkAnswer(
spark.sql(
"SELECT * FROM paimon_incremental_query('t', '1', '2') ORDER BY a, b"),
Row(1, 3, "3") :: Row(2, 4, "4") :: Nil)

checkAnswer(
incrementalDF("T", 2, 3).orderBy("a", "b"),
Row(1, 5, "5") :: Row(1, 7, "7") :: Nil)
checkAnswer(
spark.sql("SELECT * FROM paimon_incremental_query('T', '2', '3') ORDER BY a, b"),
Row(1, 5, "5") :: Row(1, 7, "7") :: Nil)
checkAnswer(
incrementalDF("t", 2, 3).orderBy("a", "b"),
Row(1, 5, "5") :: Row(1, 7, "7") :: Nil)
checkAnswer(
spark.sql(
"SELECT * FROM paimon_incremental_query('t', '2', '3') ORDER BY a, b"),
Row(1, 5, "5") :: Row(1, 7, "7") :: Nil)

checkAnswer(
incrementalDF("T", 1, 3).orderBy("a", "b"),
Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil
)
checkAnswer(
spark.sql("SELECT * FROM paimon_incremental_query('T', '1', '3') ORDER BY a, b"),
Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil)
checkAnswer(
incrementalDF("t", 1, 3).orderBy("a", "b"),
Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil
)
checkAnswer(
spark.sql(
"SELECT * FROM paimon_incremental_query('t', '1', '3') ORDER BY a, b"),
Row(1, 3, "3") :: Row(1, 5, "5") :: Row(1, 7, "7") :: Row(2, 4, "4") :: Nil)
}
}
}
}
}
Expand Down
Loading