Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.paimon.spark.SparkTable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, ResolvedTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import scala.util.control.NonFatal
Expand All @@ -32,6 +32,7 @@ object PaimonRelation extends Logging {

def unapply(plan: LogicalPlan): Option[SparkTable] =
EliminateSubqueryAliases(plan) match {
case Project(_, DataSourceV2Relation(table: SparkTable, _, _, _, _)) => Some(table)
case DataSourceV2Relation(table: SparkTable, _, _, _, _) => Some(table)
case ResolvedTable(_, _, table: SparkTable, _) => Some(table)
case _ => None
Expand All @@ -49,6 +50,7 @@ object PaimonRelation extends Logging {

def getPaimonRelation(plan: LogicalPlan): DataSourceV2Relation = {
EliminateSubqueryAliases(plan) match {
case Project(_, d @ DataSourceV2Relation(_: SparkTable, _, _, _, _)) => d
case d @ DataSourceV2Relation(_: SparkTable, _, _, _, _) => d
case _ => throw new RuntimeException(s"It's not a paimon table, $plan")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.Update

import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}

abstract class UpdateTableTestBase extends PaimonSparkTestBase {
Expand Down Expand Up @@ -349,4 +350,11 @@ abstract class UpdateTableTestBase extends PaimonSparkTestBase {
() => spark.sql("UPDATE T SET s.c2 = 'a_new', s = struct(11, 'a_new') WHERE s.c1 = 1"))
.hasMessageContaining("Conflicting update/insert on attrs: s.c2, s")
}

test("Paimon update: update table with char type") {
sql("CREATE TABLE T (id INT, s STRING, c CHAR(1))")
sql("INSERT INTO T VALUES (1, 's', 'a')")
sql("UPDATE T SET c = 'b' WHERE id = 1")
checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b")))
}
}
Loading