From 643ff6caaafe4ff5d76a7b1ee0feb2e4e9b89e2a Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Tue, 21 Jan 2025 14:50:25 +0800 Subject: [PATCH] 1 --- .../paimon/spark/catalyst/analysis/PaimonRelation.scala | 4 +++- .../org/apache/paimon/spark/sql/UpdateTableTestBase.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala index 668d2b1e94c7..c362ca67c792 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonRelation.scala @@ -22,7 +22,7 @@ import org.apache.paimon.spark.SparkTable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import scala.util.control.NonFatal @@ -32,6 +32,7 @@ object PaimonRelation extends Logging { def unapply(plan: LogicalPlan): Option[SparkTable] = EliminateSubqueryAliases(plan) match { + case Project(_, DataSourceV2Relation(table: SparkTable, _, _, _, _)) => Some(table) case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table) case ResolvedTable(_, _, table: SparkTable, _) => Some(table) case _ => None @@ -49,6 +50,7 @@ object PaimonRelation extends Logging { def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = { EliminateSubqueryAliases(plan) match { + case Project(_, d @ DataSourceV2Relation(_: SparkTable, _, _, _, _)) => d case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d case _ => throw new RuntimeException(s"It's not a paimon table, $plan") } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala index 5beaea59548f..5cbefe9faa4d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions import org.apache.paimon.spark.PaimonSparkTestBase import org.apache.paimon.spark.catalyst.analysis.Update +import org.apache.spark.sql.Row import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy} abstract class UpdateTableTestBase extends PaimonSparkTestBase { @@ -349,4 +350,11 @@ abstract class UpdateTableTestBase extends PaimonSparkTestBase { () => spark.sql("UPDATE T SET s.c2 = 'a_new', s = struct(11, 'a_new') WHERE s.c1 = 1")) .hasMessageContaining("Conflicting update/insert on attrs: s.c2, s") } + + test("Paimon update: update table with char type") { + sql("CREATE TABLE T (id INT, s STRING, c CHAR(1))") + sql("INSERT INTO T VALUES (1, 's', 'a')") + sql("UPDATE T SET c = 'b' WHERE id = 1") + checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b"))) + } }