Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,232 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
}
}
}

test("Write merge schema: fail when merge schema is disabled but new columns are provided") {
withTable("t") {
withSparkSQLConf("spark.paimon.write.merge-schema" -> "false") {
sql("CREATE TABLE t (a INT, b STRING)")
sql("INSERT INTO t VALUES (1, '1'), (2, '2')")

val error = intercept[RuntimeException] {
spark.sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
}.getMessage
assert(error.contains("the number of data columns don't match with the table schema's"))
}
}
}

test("Write merge schema: numeric types") {
withTable("t") {
withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
sql("CREATE TABLE t (a INT, b STRING)")
sql("INSERT INTO t VALUES (1, '1'), (2, '2')")

// new columns with numeric types
sql(
"INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, " +
"cast(10 as byte) AS byte_col, " +
"cast(1000 as short) AS short_col, " +
"100000 AS int_col, " +
"10000000000L AS long_col, " +
"cast(1.23 as float) AS float_col, " +
"4.56 AS double_col, " +
"cast(7.89 as decimal(10,2)) AS decimal_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(1, "1", null, null, null, null, null, null, null),
Row(2, "2", null, null, null, null, null, null, null),
Row(
3,
"3",
10.toByte,
1000.toShort,
100000,
10000000000L,
1.23f,
4.56d,
java.math.BigDecimal.valueOf(7.89))
)
)

// missing columns and new columns with numeric types
sql(
"INSERT INTO t BY NAME SELECT '4' AS d, '4' AS b, " +
"cast(20 as byte) AS byte_col, " +
"cast(2000 as short) AS short_col, " +
"200000 AS int_col, " +
"20000000000L AS long_col, " +
"cast(2.34 as float) AS float_col, " +
"5.67 AS double_col, " +
"cast(8.96 as decimal(10,2)) AS decimal_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(
null,
"4",
20.toByte,
2000.toShort,
200000,
20000000000L,
2.34f,
5.67d,
java.math.BigDecimal.valueOf(8.96),
"4"),
Row(1, "1", null, null, null, null, null, null, null, null),
Row(2, "2", null, null, null, null, null, null, null, null),
Row(
3,
"3",
10.toByte,
1000.toShort,
100000,
10000000000L,
1.23f,
4.56d,
java.math.BigDecimal.valueOf(7.89),
null)
)
)
}
}
}

test("Write merge schema: date and time types") {
withTable("t") {
withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
sql("CREATE TABLE t (a INT, b STRING)")
sql("INSERT INTO t VALUES (1, '1'), (2, '2')")

// new columns with date and time types
sql(
"INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, " +
"cast('2023-01-01' as date) AS date_col, " +
"cast('2023-01-01 12:00:00' as timestamp) AS timestamp_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(1, "1", null, null),
Row(2, "2", null, null),
Row(
3,
"3",
java.sql.Date.valueOf("2023-01-01"),
java.sql.Timestamp.valueOf("2023-01-01 12:00:00"))
)
)

// missing columns and new columns with date and time types
sql(
"INSERT INTO t BY NAME SELECT '4' AS d, '4' AS b, " +
"cast('2023-12-31' as date) AS date_col, " +
"cast('2023-12-31 23:59:59' as timestamp) AS timestamp_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(
null,
"4",
java.sql.Date.valueOf("2023-12-31"),
java.sql.Timestamp.valueOf("2023-12-31 23:59:59"),
"4"),
Row(1, "1", null, null, null),
Row(2, "2", null, null, null),
Row(
3,
"3",
java.sql.Date.valueOf("2023-01-01"),
java.sql.Timestamp.valueOf("2023-01-01 12:00:00"),
null)
)
)
}
}
}

test("Write merge schema: complex types") {
withTable("t") {
withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
sql("CREATE TABLE t (a INT, b STRING)")
sql("INSERT INTO t VALUES (1, '1'), (2, '2')")

// new columns with complex types
sql(
"INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, " +
"array(1, 2, 3) AS array_col, " +
"map('key1', 'value1', 'key2', 'value2') AS map_col, " +
"struct('x', 1) AS struct_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(1, "1", null, null, null),
Row(2, "2", null, null, null),
Row(3, "3", Array(1, 2, 3), Map("key1" -> "value1", "key2" -> "value2"), Row("x", 1))
)
)

// missing columns and new columns with complex types
sql(
"INSERT INTO t BY NAME SELECT '4' AS d, '4' AS b, " +
"array(4, 5, 6) AS array_col, " +
"map('key3', 'value3') AS map_col, " +
"struct('y', 2) AS struct_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(null, "4", Array(4, 5, 6), Map("key3" -> "value3"), Row("y", 2), "4"),
Row(1, "1", null, null, null, null),
Row(2, "2", null, null, null, null),
Row(
3,
"3",
Array(1, 2, 3),
Map("key1" -> "value1", "key2" -> "value2"),
Row("x", 1),
null)
)
)
}
}
}

test("Write merge schema: binary and boolean types") {
withTable("t") {
withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
sql("CREATE TABLE t (a INT, b STRING)")
sql("INSERT INTO t VALUES (1, '1'), (2, '2')")

// new columns with binary and boolean types
sql(
"INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, " +
"cast('binary_data' as binary) AS binary_col, " +
"true AS boolean_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(1, "1", null, null),
Row(2, "2", null, null),
Row(3, "3", "binary_data".getBytes("UTF-8"), true)
)
)

// missing columns and new columns with binary and boolean types
sql(
"INSERT INTO t BY NAME SELECT '4' AS d, '4' AS b, " +
"cast('more_data' as binary) AS binary_col, " +
"false AS boolean_col")
checkAnswer(
sql("SELECT * FROM t ORDER BY a"),
Seq(
Row(null, "4", "more_data".getBytes("UTF-8"), false, "4"),
Row(1, "1", null, null, null),
Row(2, "2", null, null, null),
Row(3, "3", "binary_data".getBytes("UTF-8"), true, null)
)
)
}
}
}

}
Loading