From cda70d186a10e8126360c2e558750b6adb08dea2 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 13 Nov 2023 15:03:01 +0800 Subject: [PATCH 1/3] feat: fill missing values with NULLs while inserting --- datafusion/sql/src/statement.rs | 46 +++++++++++-------- datafusion/sql/tests/sql_integration.rs | 33 ++++++------- datafusion/sqllogictest/test_files/insert.slt | 14 +++++- .../test_files/insert_to_external.slt | 14 +++++- 4 files changed, 69 insertions(+), 38 deletions(-) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index ecc77b0442235..49755729d2d50 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -33,7 +33,7 @@ use datafusion_common::parsers::CompressionTypeVariant; use datafusion_common::{ not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found, Column, Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, - Result, SchemaReference, TableReference, ToDFSchema, + Result, ScalarValue, SchemaReference, TableReference, ToDFSchema, }; use datafusion_expr::dml::{CopyOptions, CopyTo}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; @@ -1087,9 +1087,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let arrow_schema = (*table_source.schema()).clone(); let table_schema = DFSchema::try_from(arrow_schema)?; - // Get insert fields and index_mapping - // The i-th field of the table is `fields[index_mapping[i]]` - let (fields, index_mapping) = if columns.is_empty() { + // Get insert fields and target table's value indices + // + // if value_indices[i] = Some(j), it means that the value of the i-th target table's column is + // derived from the j-th output of the source. + // + // if value_indices[i] = None, it means that the value of the i-th target table's column is + // not provided, and should be filled with a default value later. + let (fields, value_indices) = if columns.is_empty() { // Empty means we're inserting into all columns of the table ( table_schema.fields().clone(), @@ -1098,7 +1103,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { .collect::>(), ) } else { - let mut mapping = vec![None; table_schema.fields().len()]; + let mut value_indices = vec![None; table_schema.fields().len()]; let fields = columns .into_iter() .map(|c| self.normalizer.normalize(c)) @@ -1107,19 +1112,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let column_index = table_schema .index_of_column_by_name(None, &c)? .ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?; - if mapping[column_index].is_some() { + if value_indices[column_index].is_some() { return Err(DataFusionError::SchemaError( datafusion_common::SchemaError::DuplicateUnqualifiedField { name: c, }, )); } else { - mapping[column_index] = Some(i); + value_indices[column_index] = Some(i); } Ok(table_schema.field(column_index).clone()) }) .collect::>>()?; - (fields, mapping) + (fields, value_indices) }; // infer types for Values clause... other types should be resolvable the regular way @@ -1154,17 +1159,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { plan_err!("Column count doesn't match insert query!")?; } - let exprs = index_mapping + let exprs = value_indices .into_iter() - .flatten() - .map(|i| { - let target_field = &fields[i]; - let source_field = source.schema().field(i); - let expr = - datafusion_expr::Expr::Column(source_field.unqualified_column()) - .cast_to(target_field.data_type(), source.schema())? - .alias(target_field.name()); - Ok(expr) + .enumerate() + .map(|(i, value_index)| { + let target_field = table_schema.field(i); + let expr = match value_index { + Some(v) => { + let source_field = source.schema().field(v); + datafusion_expr::Expr::Column(source_field.qualified_column()) + .cast_to(target_field.data_type(), source.schema())? + } + // Fill the default value for the column, currently only supports NULL. + None => datafusion_expr::Expr::Literal(ScalarValue::Null) + .cast_to(target_field.data_type(), &DFSchema::empty())?, + }; + Ok(expr.alias(target_field.name())) }) .collect::>>()?; let source = project(source, exprs)?; diff --git a/datafusion/sql/tests/sql_integration.rs b/datafusion/sql/tests/sql_integration.rs index ff6dca7eef2a8..4c2bad1c719e7 100644 --- a/datafusion/sql/tests/sql_integration.rs +++ b/datafusion/sql/tests/sql_integration.rs @@ -422,12 +422,11 @@ CopyTo: format=csv output_url=output.csv single_file_output=true options: () fn plan_insert() { let sql = "insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')"; - let plan = r#" -Dml: op=[Insert Into] table=[person] - Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name - Values: (Int64(1), Utf8("Alan"), Utf8("Turing")) - "# - .trim(); + let plan = "Dml: op=[Insert Into] table=[person]\ + \n Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name, \ + CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \ + CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\ + \n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))"; quick_test(sql, plan); } @@ -4037,12 +4036,11 @@ Dml: op=[Update] table=[person] fn test_prepare_statement_insert_infer() { let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3)"; - let expected_plan = r#" -Dml: op=[Insert Into] table=[person] - Projection: column1 AS id, column2 AS first_name, column3 AS last_name - Values: ($1, $2, $3) - "# - .trim(); + let expected_plan = "Dml: op=[Insert Into] table=[person]\ + \n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \ + CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \ + CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\ + \n Values: ($1, $2, $3)"; let expected_dt = "[Int32]"; let plan = prepare_stmt_quick_test(sql, expected_plan, expected_dt); @@ -4061,12 +4059,11 @@ Dml: op=[Insert Into] table=[person] ScalarValue::Utf8(Some("Alan".to_string())), ScalarValue::Utf8(Some("Turing".to_string())), ]; - let expected_plan = r#" -Dml: op=[Insert Into] table=[person] - Projection: column1 AS id, column2 AS first_name, column3 AS last_name - Values: (UInt32(1), Utf8("Alan"), Utf8("Turing")) - "# - .trim(); + let expected_plan = "Dml: op=[Insert Into] table=[person]\ + \n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \ + CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \ + CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\ + \n Values: (UInt32(1), Utf8(\"Alan\"), Utf8(\"Turing\"))"; let plan = plan.replace_params_with_values(¶m_values).unwrap(); prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan); diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index 8b9fd52e0d941..fed55163e859e 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -258,14 +258,17 @@ insert into table_without_values(name, id) values(4, 'zoo'); statement error Error during planning: Column count doesn't match insert query! insert into table_without_values(id) values(4, 'zoo'); -statement error Error during planning: Inserting query must have the same schema with the table. +query IT insert into table_without_values(id) values(4); +---- +1 query IT rowsort select * from table_without_values; ---- 1 foo 2 bar +4 NULL statement ok drop table table_without_values; @@ -285,6 +288,14 @@ insert into table_without_values values(2, NULL); ---- 1 +query II +insert into table_without_values(field1) values(3); +---- +1 + +statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable +insert into table_without_values(field2) values(300); + statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable insert into table_without_values values(NULL, 300); @@ -296,6 +307,7 @@ select * from table_without_values; ---- 1 100 2 NULL +3 NULL statement ok drop table table_without_values; diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index d6449bc2726ef..5a48686b69a80 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -468,14 +468,17 @@ insert into table_without_values(name, id) values(4, 'zoo'); statement error Error during planning: Column count doesn't match insert query! insert into table_without_values(id) values(4, 'zoo'); -statement error Error during planning: Inserting query must have the same schema with the table. +query IT insert into table_without_values(id) values(4); +---- +1 query IT rowsort select * from table_without_values; ---- 1 foo 2 bar +4 NULL statement ok drop table table_without_values; @@ -498,6 +501,14 @@ insert into table_without_values values(2, NULL); ---- 1 +query II +insert into table_without_values(field1) values(3); +---- +1 + +statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable +insert into table_without_values(field2) values(300); + statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable insert into table_without_values values(NULL, 300); @@ -509,6 +520,7 @@ select * from table_without_values; ---- 1 100 2 NULL +3 NULL statement ok drop table table_without_values; From f8870216ae397fa5309ac09e32668b8a9fa058c9 Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 13 Nov 2023 15:29:33 +0800 Subject: [PATCH 2/3] add test comment --- datafusion/sqllogictest/test_files/insert.slt | 3 +++ datafusion/sqllogictest/test_files/insert_to_external.slt | 3 +++ 2 files changed, 6 insertions(+) diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index fed55163e859e..a100b5ac6b854 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -258,6 +258,7 @@ insert into table_without_values(name, id) values(4, 'zoo'); statement error Error during planning: Column count doesn't match insert query! insert into table_without_values(id) values(4, 'zoo'); +# insert NULL values for the missing column (name) query IT insert into table_without_values(id) values(4); ---- @@ -288,11 +289,13 @@ insert into table_without_values values(2, NULL); ---- 1 +# insert NULL values for the missing column (field2) query II insert into table_without_values(field1) values(3); ---- 1 +# insert NULL values for the missing column (field1), but column is non-nullable statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable insert into table_without_values(field2) values(300); diff --git a/datafusion/sqllogictest/test_files/insert_to_external.slt b/datafusion/sqllogictest/test_files/insert_to_external.slt index 5a48686b69a80..44410362412c5 100644 --- a/datafusion/sqllogictest/test_files/insert_to_external.slt +++ b/datafusion/sqllogictest/test_files/insert_to_external.slt @@ -468,6 +468,7 @@ insert into table_without_values(name, id) values(4, 'zoo'); statement error Error during planning: Column count doesn't match insert query! insert into table_without_values(id) values(4, 'zoo'); +# insert NULL values for the missing column (name) query IT insert into table_without_values(id) values(4); ---- @@ -501,11 +502,13 @@ insert into table_without_values values(2, NULL); ---- 1 +# insert NULL values for the missing column (field2) query II insert into table_without_values(field1) values(3); ---- 1 +# insert NULL values for the missing column (field1), but column is non-nullable statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable insert into table_without_values(field2) values(300); From 8259d2176cabb1d2a0ebf6027f6e59bf4e19152c Mon Sep 17 00:00:00 2001 From: jonahgao Date: Mon, 13 Nov 2023 15:43:13 +0800 Subject: [PATCH 3/3] update to re-trigger ci --- datafusion/sqllogictest/test_files/insert.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/insert.slt b/datafusion/sqllogictest/test_files/insert.slt index a100b5ac6b854..9860bdcae05c1 100644 --- a/datafusion/sqllogictest/test_files/insert.slt +++ b/datafusion/sqllogictest/test_files/insert.slt @@ -289,7 +289,7 @@ insert into table_without_values values(2, NULL); ---- 1 -# insert NULL values for the missing column (field2) +# insert NULL values for the missing column (field2) query II insert into table_without_values(field1) values(3); ----