Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, unqualified_field_not_found, Column,
Constraints, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
Result, SchemaReference, TableReference, ToDFSchema,
Result, ScalarValue, SchemaReference, TableReference, ToDFSchema,
};
use datafusion_expr::dml::{CopyOptions, CopyTo};
use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check;
Expand Down Expand Up @@ -1087,9 +1087,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let arrow_schema = (*table_source.schema()).clone();
let table_schema = DFSchema::try_from(arrow_schema)?;

// Get insert fields and index_mapping
// The i-th field of the table is `fields[index_mapping[i]]`
let (fields, index_mapping) = if columns.is_empty() {
// Get insert fields and target table's value indices
//
// if value_indices[i] = Some(j), it means that the value of the i-th target table's column is
// derived from the j-th output of the source.
//
// if value_indices[i] = None, it means that the value of the i-th target table's column is
// not provided, and should be filled with a default value later.
let (fields, value_indices) = if columns.is_empty() {
// Empty means we're inserting into all columns of the table
(
table_schema.fields().clone(),
Expand All @@ -1098,7 +1103,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.collect::<Vec<_>>(),
)
} else {
let mut mapping = vec![None; table_schema.fields().len()];
let mut value_indices = vec![None; table_schema.fields().len()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new name value_indicies much better

let fields = columns
.into_iter()
.map(|c| self.normalizer.normalize(c))
Expand All @@ -1107,19 +1112,19 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
let column_index = table_schema
.index_of_column_by_name(None, &c)?
.ok_or_else(|| unqualified_field_not_found(&c, &table_schema))?;
if mapping[column_index].is_some() {
if value_indices[column_index].is_some() {
return Err(DataFusionError::SchemaError(
datafusion_common::SchemaError::DuplicateUnqualifiedField {
name: c,
},
));
} else {
mapping[column_index] = Some(i);
value_indices[column_index] = Some(i);
}
Ok(table_schema.field(column_index).clone())
})
.collect::<Result<Vec<DFField>>>()?;
(fields, mapping)
(fields, value_indices)
};

// infer types for Values clause... other types should be resolvable the regular way
Expand Down Expand Up @@ -1154,17 +1159,22 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan_err!("Column count doesn't match insert query!")?;
}

let exprs = index_mapping
let exprs = value_indices
.into_iter()
.flatten()
.map(|i| {
let target_field = &fields[i];
let source_field = source.schema().field(i);
let expr =
datafusion_expr::Expr::Column(source_field.unqualified_column())
.cast_to(target_field.data_type(), source.schema())?
.alias(target_field.name());
Ok(expr)
.enumerate()
.map(|(i, value_index)| {
let target_field = table_schema.field(i);
let expr = match value_index {
Some(v) => {
let source_field = source.schema().field(v);
datafusion_expr::Expr::Column(source_field.qualified_column())
.cast_to(target_field.data_type(), source.schema())?
}
// Fill the default value for the column, currently only supports NULL.
None => datafusion_expr::Expr::Literal(ScalarValue::Null)
.cast_to(target_field.data_type(), &DFSchema::empty())?,
};
Ok(expr.alias(target_field.name()))
})
.collect::<Result<Vec<datafusion_expr::Expr>>>()?;
let source = project(source, exprs)?;
Expand Down
33 changes: 15 additions & 18 deletions datafusion/sql/tests/sql_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,12 +422,11 @@ CopyTo: format=csv output_url=output.csv single_file_output=true options: ()
fn plan_insert() {
let sql =
"insert into person (id, first_name, last_name) values (1, 'Alan', 'Turing')";
let plan = r#"
Dml: op=[Insert Into] table=[person]
Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name
Values: (Int64(1), Utf8("Alan"), Utf8("Turing"))
"#
.trim();
let plan = "Dml: op=[Insert Into] table=[person]\
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

\n Projection: CAST(column1 AS UInt32) AS id, column2 AS first_name, column3 AS last_name, \
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \
CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\
\n Values: (Int64(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
quick_test(sql, plan);
}

Expand Down Expand Up @@ -4037,12 +4036,11 @@ Dml: op=[Update] table=[person]
fn test_prepare_statement_insert_infer() {
let sql = "insert into person (id, first_name, last_name) values ($1, $2, $3)";

let expected_plan = r#"
Dml: op=[Insert Into] table=[person]
Projection: column1 AS id, column2 AS first_name, column3 AS last_name
Values: ($1, $2, $3)
"#
.trim();
let expected_plan = "Dml: op=[Insert Into] table=[person]\
\n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \
CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\
\n Values: ($1, $2, $3)";

let expected_dt = "[Int32]";
let plan = prepare_stmt_quick_test(sql, expected_plan, expected_dt);
Expand All @@ -4061,12 +4059,11 @@ Dml: op=[Insert Into] table=[person]
ScalarValue::Utf8(Some("Alan".to_string())),
ScalarValue::Utf8(Some("Turing".to_string())),
];
let expected_plan = r#"
Dml: op=[Insert Into] table=[person]
Projection: column1 AS id, column2 AS first_name, column3 AS last_name
Values: (UInt32(1), Utf8("Alan"), Utf8("Turing"))
"#
.trim();
let expected_plan = "Dml: op=[Insert Into] table=[person]\
\n Projection: column1 AS id, column2 AS first_name, column3 AS last_name, \
CAST(NULL AS Int32) AS age, CAST(NULL AS Utf8) AS state, CAST(NULL AS Float64) AS salary, \
CAST(NULL AS Timestamp(Nanosecond, None)) AS birth_date, CAST(NULL AS Int32) AS 😀\
\n Values: (UInt32(1), Utf8(\"Alan\"), Utf8(\"Turing\"))";
let plan = plan.replace_params_with_values(&param_values).unwrap();

prepare_stmt_replace_params_quick_test(plan, param_values, expected_plan);
Expand Down
17 changes: 16 additions & 1 deletion datafusion/sqllogictest/test_files/insert.slt
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,18 @@ insert into table_without_values(name, id) values(4, 'zoo');
statement error Error during planning: Column count doesn't match insert query!
insert into table_without_values(id) values(4, 'zoo');

statement error Error during planning: Inserting query must have the same schema with the table.
# insert NULL values for the missing column (name)
query IT
insert into table_without_values(id) values(4);
----
1

query IT rowsort
select * from table_without_values;
----
1 foo
2 bar
4 NULL

statement ok
drop table table_without_values;
Expand All @@ -285,6 +289,16 @@ insert into table_without_values values(2, NULL);
----
1

# insert NULL values for the missing column (field2)
query II
insert into table_without_values(field1) values(3);
----
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values(field2) values(300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values values(NULL, 300);

Expand All @@ -296,6 +310,7 @@ select * from table_without_values;
----
1 100
2 NULL
3 NULL

statement ok
drop table table_without_values;
17 changes: 16 additions & 1 deletion datafusion/sqllogictest/test_files/insert_to_external.slt
Original file line number Diff line number Diff line change
Expand Up @@ -468,14 +468,18 @@ insert into table_without_values(name, id) values(4, 'zoo');
statement error Error during planning: Column count doesn't match insert query!
insert into table_without_values(id) values(4, 'zoo');

statement error Error during planning: Inserting query must have the same schema with the table.
# insert NULL values for the missing column (name)
query IT
insert into table_without_values(id) values(4);
----
1

query IT rowsort
select * from table_without_values;
----
1 foo
2 bar
4 NULL

statement ok
drop table table_without_values;
Expand All @@ -498,6 +502,16 @@ insert into table_without_values values(2, NULL);
----
1

# insert NULL values for the missing column (field2)
query II
insert into table_without_values(field1) values(3);
----
1

# insert NULL values for the missing column (field1), but column is non-nullable
statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values(field2) values(300);

statement error Execution error: Invalid batch column at '0' has null but schema specifies non-nullable
insert into table_without_values values(NULL, 300);

Expand All @@ -509,6 +523,7 @@ select * from table_without_values;
----
1 100
2 NULL
3 NULL

statement ok
drop table table_without_values;