From 2b0648196794799542e95afa18eb9c0717fe1e3c Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 6 Dec 2024 19:13:17 +0800 Subject: [PATCH 1/4] add test --- .../sql/InsertOverwriteTableTestBase.scala | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index 03026e857429..deb809d51a53 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -508,4 +508,36 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { ) :: Nil ) } + + test("Paimon Insert: insert with column list") { + sql("CREATE TABLE T (name String, student_id INT) PARTITIONED BY (address STRING)") + + // insert with a column list + sql("INSERT INTO T (name, student_id, address) VALUES ('a', '1', 'Hangzhou')") + sql("INSERT INTO T (name) VALUES ('b')") + sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')") + + // insert with both a partition spec and a column list + sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')") + sql("INSERT INTO T PARTITION (address='Hangzhou') (student_id, name) VALUES (5, 'e')") + + checkAnswer( + sql("SELECT * FROM T ORDER BY name"), + Seq( + Row("a", 1, "Hangzhou"), + Row("b", null, null), + Row("c", null, "Hangzhou"), + Row("d", null, "Beijing"), + Row("e", 5, "Hangzhou")) + ) + + // insert overwrite with a column list + sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')") + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai")) + + // insert overwrite with both a partition spec and a column list + sql( + "INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', 7)") + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", 7, "Shanghai")) + } } From 1393ef0d70354ded897f3d46f1f705c436f3f214 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 6 Dec 2024 19:22:31 +0800 Subject: [PATCH 2/4] 1 --- .../apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index deb809d51a53..0aac6c9270e3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -536,8 +536,7 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai")) // insert overwrite with both a partition spec and a column list - sql( - "INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', 7)") + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', 7)") checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", 7, "Shanghai")) } } From 0603c8da33d47d8580a4ee36214fd24844cdf060 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Fri, 6 Dec 2024 21:06:00 +0800 Subject: [PATCH 3/4] 1 --- .../sql/InsertOverwriteTableTestBase.scala | 33 +++++++++++++++---- 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala index 0aac6c9270e3..977b74707069 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala @@ -514,11 +514,24 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { // insert with a column list sql("INSERT INTO T (name, student_id, address) VALUES ('a', '1', 'Hangzhou')") - sql("INSERT INTO T (name) VALUES ('b')") - sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')") + // Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target + // table will automatically add the corresponding default values for the remaining columns (or NULL for any column + // lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed. + // See https://issues.apache.org/jira/browse/SPARK-42521 + if (gteqSpark3_4) { + sql("INSERT INTO T (name) VALUES ('b')") + sql("INSERT INTO T (address, name) VALUES ('Hangzhou', 'c')") + } else { + sql("INSERT INTO T (name, student_id, address) VALUES ('b', null, null)") + sql("INSERT INTO T (name, student_id, address) VALUES ('c', null, 'Hangzhou')") + } // insert with both a partition spec and a column list - sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')") + if (gteqSpark3_4) { + sql("INSERT INTO T PARTITION (address='Beijing') (name) VALUES ('d')") + } else { + sql("INSERT INTO T PARTITION (address='Beijing') (name, student_id) VALUES ('d', null)") + } sql("INSERT INTO T PARTITION (address='Hangzhou') (student_id, name) VALUES (5, 'e')") checkAnswer( @@ -532,11 +545,19 @@ abstract class InsertOverwriteTableTestBase extends PaimonSparkTestBase { ) // insert overwrite with a column list - sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')") + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T (name, address) VALUES ('f', 'Shanghai')") + } else { + sql("INSERT OVERWRITE T (name, student_id, address) VALUES ('f', null, 'Shanghai')") + } checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("f", null, "Shanghai")) // insert overwrite with both a partition spec and a column list - sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', 7)") - checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", 7, "Shanghai")) + if (gteqSpark3_4) { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name) VALUES ('g')") + } else { + sql("INSERT OVERWRITE T PARTITION (address='Shanghai') (name, student_id) VALUES ('g', null)") + } + checkAnswer(sql("SELECT * FROM T ORDER BY name"), Row("g", null, "Shanghai")) } } From ad4c9a6464ae040309c63d4ebbcba740c32a8247 Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Sat, 7 Dec 2024 00:03:19 +0800 Subject: [PATCH 4/4] update docs --- docs/content/spark/sql-write.md | 39 +++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md index d2777110914f..5f4fa2dabc9f 100644 --- a/docs/content/spark/sql-write.md +++ b/docs/content/spark/sql-write.md @@ -26,17 +26,30 @@ under the License. # SQL Write -## Syntax +## Insert Table + +The `INSERT` statement inserts new rows into a table or overwrites the existing data in the table. The inserted rows can be specified by value expressions or result from a query. + +**Syntax** ```sql INSERT { INTO | OVERWRITE } table_identifier [ part_spec ] [ column_list ] { value_expr | query }; ``` +**Parameters** + +- **table_identifier**: Specifies a table name, which may be optionally qualified with a database name. + +- **part_spec**: An optional parameter that specifies a comma-separated list of key and value pairs for partitions. -For more information, please check the syntax document: +- **column_list**: An optional parameter that specifies a comma-separated list of columns belonging to the table_identifier table. Spark will reorder the columns of the input query to match the table schema according to the specified column list. -[Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + Note: Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, column_list's size must be equal to the target table's column size, otherwise these commands would have failed. -## INSERT INTO +- **value_expr** ( { value | NULL } [ , … ] ) [ , ( … ) ]: Specifies the values to be inserted. Either an explicitly specified value or a NULL can be inserted. A comma must be used to separate each value in the clause. More than one set of values can be specified to insert multiple rows. + +For more information, please check the syntax document: [Spark INSERT Statement](https://spark.apache.org/docs/latest/sql-ref-syntax-dml-insert-table.html) + +### Insert Into Use `INSERT INTO` to apply records and changes to tables. @@ -44,15 +57,15 @@ Use `INSERT INTO` to apply records and changes to tables. INSERT INTO my_table SELECT ... ``` -## Overwriting the Whole Table +### Insert Overwrite -Use `INSERT OVERWRITE` to overwrite the whole unpartitioned table. +Use `INSERT OVERWRITE` to overwrite the whole table. ```sql INSERT OVERWRITE my_table SELECT ... ``` -### Overwriting a Partition +#### Insert Overwrite Partition Use `INSERT OVERWRITE` to overwrite a partition. @@ -60,7 +73,7 @@ Use `INSERT OVERWRITE` to overwrite a partition. INSERT OVERWRITE my_table PARTITION (key1 = value1, key2 = value2, ...) SELECT ... ``` -### Dynamic Overwrite +#### Dynamic Overwrite Partition Spark's default overwrite mode is `static` partition overwrite. To enable dynamic overwritten you need to set the Spark session configuration `spark.sql.sources.partitionOverwriteMode` to `dynamic` @@ -97,13 +110,15 @@ SELECT * FROM my_table; */ ``` -## Truncate tables +## Truncate Table + +The `TRUNCATE TABLE` statement removes all the rows from a table or partition(s). ```sql TRUNCATE TABLE my_table; ``` -## Updating tables +## Update Table spark supports update PrimitiveType and StructType, for example: @@ -125,13 +140,13 @@ UPDATE t SET name = 'a_new' WHERE id = 1; UPDATE t SET s.c2 = 'a_new' WHERE s.c1 = 1; ``` -## Deleting from table +## Delete From Table ```sql DELETE FROM my_table WHERE currency = 'UNKNOWN'; ``` -## Merging into table +## Merge Into Table Paimon currently supports Merge Into syntax in Spark 3+, which allow a set of updates, insertions and deletions based on a source table in a single commit.