diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index ee31d9ead52e1..4c70595fc9566 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -69,7 +69,11 @@ use crate::logical_expr::{ LogicalPlanBuilder, SetVariable, TableSource, TableType, UNNAMED_TABLE, }; use crate::optimizer::OptimizerRule; -use datafusion_sql::{planner::ParserOptions, ResolvedTableReference, TableReference}; +use datafusion_sql::{ + parser::{CopyToSource, CopyToStatement}, + planner::ParserOptions, + ResolvedTableReference, TableReference, +}; use crate::physical_optimizer::coalesce_batches::CoalesceBatches; use crate::physical_optimizer::repartition::Repartition; @@ -1686,45 +1690,58 @@ impl SessionState { // table providers for all relations referenced in this query let mut relations = hashbrown::HashSet::with_capacity(10); - match statement { - DFStatement::Statement(s) => { - struct RelationVisitor<'a>(&'a mut hashbrown::HashSet); + struct RelationVisitor<'a>(&'a mut hashbrown::HashSet); + + impl<'a> RelationVisitor<'a> { + /// Record that `relation` was used in this statement + fn insert(&mut self, relation: &ObjectName) { + self.0.get_or_insert_with(relation, |_| relation.clone()); + } + } - impl<'a> Visitor for RelationVisitor<'a> { - type Break = (); + impl<'a> Visitor for RelationVisitor<'a> { + type Break = (); - fn pre_visit_relation( - &mut self, - relation: &ObjectName, - ) -> ControlFlow<()> { - self.0.get_or_insert_with(relation, |_| relation.clone()); - ControlFlow::Continue(()) - } + fn pre_visit_relation(&mut self, relation: &ObjectName) -> ControlFlow<()> { + self.insert(relation); + ControlFlow::Continue(()) + } - fn pre_visit_statement( - &mut self, - statement: &Statement, - ) -> ControlFlow<()> { - if let Statement::ShowCreate { - obj_type: ShowCreateObject::Table | ShowCreateObject::View, - obj_name, - } = statement - { - self.0.get_or_insert_with(obj_name, |_| obj_name.clone()); - } - ControlFlow::Continue(()) - } + fn pre_visit_statement(&mut self, statement: &Statement) -> ControlFlow<()> { + if let Statement::ShowCreate { + obj_type: ShowCreateObject::Table | ShowCreateObject::View, + obj_name, + } = statement + { + self.insert(obj_name) } - let mut visitor = RelationVisitor(&mut relations); + ControlFlow::Continue(()) + } + } + + let mut visitor = RelationVisitor(&mut relations); + match statement { + DFStatement::Statement(s) => { let _ = s.as_ref().visit(&mut visitor); } DFStatement::CreateExternalTable(table) => { - relations.insert(ObjectName(vec![Ident::from(table.name.as_str())])); - } - DFStatement::DescribeTableStmt(table) => { - relations - .get_or_insert_with(&table.table_name, |_| table.table_name.clone()); + visitor + .0 + .insert(ObjectName(vec![Ident::from(table.name.as_str())])); } + DFStatement::DescribeTableStmt(table) => visitor.insert(&table.table_name), + DFStatement::CopyTo(CopyToStatement { + source, + target: _, + options: _, + }) => match source { + CopyToSource::Relation(table_name) => { + visitor.insert(table_name); + } + CopyToSource::Query(query) => { + query.visit(&mut visitor); + } + }, } // Always include information_schema if available diff --git a/datafusion/core/tests/sqllogictests/test_files/copy.slt b/datafusion/core/tests/sqllogictests/test_files/copy.slt new file mode 100644 index 0000000000000..e7bde89d2940c --- /dev/null +++ b/datafusion/core/tests/sqllogictests/test_files/copy.slt @@ -0,0 +1,44 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# tests for copy command + +statement ok +create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar'); + +# Copy from table +statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported +COPY source_table to '/tmp/table.parquet'; + +# Copy from table with options +statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported +COPY source_table to '/tmp/table.parquet' (row_group_size 55); + +# Copy from table with options (and trailing comma) +statement error DataFusion error: This feature is not implemented: `COPY \.\. TO \.\.` statement is not yet supported +COPY source_table to '/tmp/table.parquet' (row_group_size 55, row_group_limit_bytes 9,); + + +# Error cases: + +# Incomplete statement +statement error DataFusion error: SQL error: ParserError\("Expected \), found: EOF"\) +COPY (select col2, sum(col1) from source_table + +# Copy from table with non literal +statement error DataFusion error: SQL error: ParserError\("Expected ',' or '\)' after option definition, found: \+"\) +COPY source_table to '/tmp/table.parquet' (row_group_size 55 + 102); diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index a70868fa2f48c..a0e928910dca0 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -18,7 +18,8 @@ //! DataFusion SQL Parser based on [`sqlparser`] use datafusion_common::parsers::CompressionTypeVariant; -use sqlparser::ast::OrderByExpr; +use sqlparser::ast::{OrderByExpr, Query, Value}; +use sqlparser::tokenizer::Word; use sqlparser::{ ast::{ ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement, @@ -28,8 +29,9 @@ use sqlparser::{ parser::{Parser, ParserError}, tokenizer::{Token, TokenWithLocation, Tokenizer}, }; +use std::collections::VecDeque; +use std::fmt; use std::{collections::HashMap, str::FromStr}; -use std::{collections::VecDeque, fmt}; // Use `Parser::expected` instead, if possible macro_rules! parser_err { @@ -42,6 +44,78 @@ fn parse_file_type(s: &str) -> Result { Ok(s.to_uppercase()) } +/// DataFusion extension DDL for `COPY` +/// +/// # Syntax: +/// +/// ```text +/// COPY )> +/// TO +/// +/// (key_value_list) +/// ``` +/// +/// # Examples +/// +/// ```sql +/// COPY lineitem TO 'lineitem' +/// (format parquet, +/// partitions 16, +/// row_group_limit_rows 100000, +// row_group_limit_bytes 200000 +/// ) +/// +/// COPY (SELECT l_orderkey from lineitem) to 'lineitem.parquet'; +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CopyToStatement { + /// From where the data comes from + pub source: CopyToSource, + /// The URL to where the data is heading + pub target: String, + /// Target specific options + pub options: HashMap, +} + +impl fmt::Display for CopyToStatement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Self { + source, + target, + options, + } = self; + + write!(f, "COPY {source} TO {target}")?; + + if !options.is_empty() { + let mut opts: Vec<_> = + options.iter().map(|(k, v)| format!("{k} {v}")).collect(); + // print them in sorted order + opts.sort_unstable(); + write!(f, " ({})", opts.join(", "))?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum CopyToSource { + /// `COPY TO ...` + Relation(ObjectName), + /// COPY (...query...) TO ... + Query(Query), +} + +impl fmt::Display for CopyToSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CopyToSource::Relation(r) => write!(f, "{r}"), + CopyToSource::Query(q) => write!(f, "({q})"), + } + } +} + /// DataFusion extension DDL for `CREATE EXTERNAL TABLE` /// /// Syntax: @@ -119,12 +193,25 @@ pub struct DescribeTableStmt { /// Tokens parsed by [`DFParser`] are converted into these values. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Statement { - /// ANSI SQL AST node + /// ANSI SQL AST node (from sqlparser-rs) Statement(Box), /// Extension: `CREATE EXTERNAL TABLE` CreateExternalTable(CreateExternalTable), /// Extension: `DESCRIBE TABLE` DescribeTableStmt(DescribeTableStmt), + /// Extension: `COPY TO` + CopyTo(CopyToStatement), +} + +impl fmt::Display for Statement { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Statement::Statement(stmt) => write!(f, "{stmt}"), + Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"), + Statement::DescribeTableStmt(_) => write!(f, "DESCRIBE TABLE ..."), + Statement::CopyTo(stmt) => write!(f, "{stmt}"), + } + } } /// DataFusion SQL Parser based on [`sqlparser`] @@ -213,6 +300,11 @@ impl<'a> DFParser<'a> { // use custom parsing self.parse_create() } + Keyword::COPY => { + // move one token forward + self.parser.next_token(); + self.parse_copy() + } Keyword::DESCRIBE => { // move one token forward self.parser.next_token(); @@ -244,6 +336,79 @@ impl<'a> DFParser<'a> { })) } + /// Parse a SQL `COPY TO` statement + pub fn parse_copy(&mut self) -> Result { + // parse as a query + let source = if self.parser.consume_token(&Token::LParen) { + let query = self.parser.parse_query()?; + self.parser.expect_token(&Token::RParen)?; + CopyToSource::Query(query) + } else { + // parse as table reference + let table_name = self.parser.parse_object_name()?; + CopyToSource::Relation(table_name) + }; + + self.parser.expect_keyword(Keyword::TO)?; + + let target = self.parser.parse_literal_string()?; + + // check for options in parens + let options = if self.parser.peek_token().token == Token::LParen { + self.parse_value_options()? + } else { + HashMap::new() + }; + + Ok(Statement::CopyTo(CopyToStatement { + source, + target, + options, + })) + } + + /// Parse the next token as a key name for an option list + /// + /// Note this is different than [`parse_literal_string`] + /// because it allows keywords as well as other non words + /// + /// [`parse_literal_string`]: sqlparser::parser::Parser::parse_literal_string + pub fn parse_option_key(&mut self) -> Result { + let next_token = self.parser.next_token(); + match next_token.token { + Token::Word(Word { value, .. }) => Ok(value), + Token::SingleQuotedString(s) => Ok(s), + Token::DoubleQuotedString(s) => Ok(s), + Token::EscapedStringLiteral(s) => Ok(s), + _ => self.parser.expected("key name", next_token), + } + } + + /// Parse the next token as a value for an option list + /// + /// Note this is different than [`parse_value`] as it allows any + /// word or keyword in this location. + /// + /// [`parse_value`]: sqlparser::parser::Parser::parse_value + pub fn parse_option_value(&mut self) -> Result { + let next_token = self.parser.next_token(); + match next_token.token { + Token::Word(Word { value, .. }) => Ok(Value::UnQuotedString(value)), + Token::SingleQuotedString(s) => Ok(Value::SingleQuotedString(s)), + Token::DoubleQuotedString(s) => Ok(Value::DoubleQuotedString(s)), + Token::EscapedStringLiteral(s) => Ok(Value::EscapedStringLiteral(s)), + Token::Number(ref n, l) => match n.parse() { + Ok(n) => Ok(Value::Number(n, l)), + // The tokenizer should have ensured `n` is an integer + // so this should not be possible + Err(e) => parser_err!(format!( + "Unexpected error: could not parse '{n}' as number: {e}" + )), + }, + _ => self.parser.expected("string or numeric value", next_token), + } + } + /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE` pub fn parse_create(&mut self) -> Result { if self.parser.parse_keyword(Keyword::EXTERNAL) { @@ -485,7 +650,7 @@ impl<'a> DFParser<'a> { } Keyword::OPTIONS => { ensure_not_set(&builder.options, "OPTIONS")?; - builder.options = Some(self.parse_options()?); + builder.options = Some(self.parse_string_options()?); } _ => { unreachable!() @@ -555,14 +720,42 @@ impl<'a> DFParser<'a> { } } - fn parse_options(&mut self) -> Result, ParserError> { - let mut options: HashMap = HashMap::new(); + /// Parses (key value) style options where the values are literal strings. + fn parse_string_options(&mut self) -> Result, ParserError> { + let mut options = HashMap::new(); self.parser.expect_token(&Token::LParen)?; loop { let key = self.parser.parse_literal_string()?; let value = self.parser.parse_literal_string()?; - options.insert(key.to_string(), value.to_string()); + options.insert(key, value); + let comma = self.parser.consume_token(&Token::Comma); + if self.parser.consume_token(&Token::RParen) { + // allow a trailing comma, even though it's not in standard + break; + } else if !comma { + return self.expected( + "',' or ')' after option definition", + self.parser.peek_token(), + ); + } + } + Ok(options) + } + + /// Parses (key value) style options into a map of String --> [`Value`]. + /// + /// Unlike [`Self::parse_string_options`], this method supports + /// keywords as key names as well as multiple value types such as + /// Numbers as well as Strings. + fn parse_value_options(&mut self) -> Result, ParserError> { + let mut options = HashMap::new(); + self.parser.expect_token(&Token::LParen)?; + + loop { + let key = self.parse_option_key()?; + let value = self.parse_option_value()?; + options.insert(key, value); let comma = self.parser.consume_token(&Token::Comma); if self.parser.consume_token(&Token::RParen) { // allow a trailing comma, even though it's not in standard @@ -602,7 +795,7 @@ mod tests { 1, "Expected to parse exactly one statement" ); - assert_eq!(statements[0], expected); + assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]); Ok(()) } @@ -1074,4 +1267,134 @@ mod tests { Ok(()) } + + #[test] + fn copy_to_table_to_table() -> Result<(), ParserError> { + // positive case + let sql = "COPY foo TO bar"; + let expected = Statement::CopyTo(CopyToStatement { + source: object_name("foo"), + target: "bar".to_string(), + options: HashMap::new(), + }); + + assert_eq!(verified_stmt(sql), expected); + Ok(()) + } + + #[test] + fn copy_to_query_to_table() -> Result<(), ParserError> { + let statement = verified_stmt("SELECT 1"); + + // unwrap the various layers + let statement = if let Statement::Statement(statement) = statement { + *statement + } else { + panic!("Expected statement, got {statement:?}"); + }; + + let query = if let SQLStatement::Query(query) = statement { + *query + } else { + panic!("Expected query, got {statement:?}"); + }; + + let sql = "COPY (SELECT 1) TO bar"; + let expected = Statement::CopyTo(CopyToStatement { + source: CopyToSource::Query(query), + target: "bar".to_string(), + options: HashMap::new(), + }); + assert_eq!(verified_stmt(sql), expected); + Ok(()) + } + + #[test] + fn copy_to_options() -> Result<(), ParserError> { + let sql = "COPY foo TO bar (row_group_size 55)"; + let expected = Statement::CopyTo(CopyToStatement { + source: object_name("foo"), + target: "bar".to_string(), + options: HashMap::from([( + "row_group_size".to_string(), + Value::Number("55".to_string(), false), + )]), + }); + assert_eq!(verified_stmt(sql), expected); + Ok(()) + } + + #[test] + fn copy_to_multi_options() -> Result<(), ParserError> { + let sql = + "COPY foo TO bar (format parquet, row_group_size 55, compression snappy)"; + // canonical order is alphabetical + let canonical = + "COPY foo TO bar (compression snappy, format parquet, row_group_size 55)"; + + let expected_options = HashMap::from([ + ( + "compression".to_string(), + Value::UnQuotedString("snappy".to_string()), + ), + ( + "format".to_string(), + Value::UnQuotedString("parquet".to_string()), + ), + ( + "row_group_size".to_string(), + Value::Number("55".to_string(), false), + ), + ]); + + let options = + if let Statement::CopyTo(copy_to) = one_statement_parses_to(sql, canonical) { + copy_to.options + } else { + panic!("Expected copy"); + }; + + assert_eq!(options, expected_options); + + Ok(()) + } + + // For error cases, see: `copy.slt` + + fn object_name(name: &str) -> CopyToSource { + CopyToSource::Relation(ObjectName(vec![Ident::new(name)])) + } + + // Based on sqlparser-rs + // https://github.com/sqlparser-rs/sqlparser-rs/blob/ae3b5844c839072c235965fe0d1bddc473dced87/src/test_utils.rs#L104-L116 + + /// Ensures that `sql` parses as a single [Statement] + /// + /// If `canonical` is non empty,this function additionally asserts + /// that: + /// + /// 1. parsing `sql` results in the same [`Statement`] as parsing + /// `canonical`. + /// + /// 2. re-serializing the result of parsing `sql` produces the same + /// `canonical` sql string + fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement { + let mut statements = DFParser::parse_sql(sql).unwrap(); + assert_eq!(statements.len(), 1); + + if sql != canonical { + assert_eq!(DFParser::parse_sql(canonical).unwrap(), statements); + } + + let only_statement = statements.pop_front().unwrap(); + assert_eq!(canonical, only_statement.to_string()); + only_statement + } + + /// Ensures that `sql` parses as a single [Statement], and that + /// re-serializing the parse result produces the same `sql` + /// string (is not modified after a serialization round-trip). + fn verified_stmt(sql: &str) -> Statement { + one_statement_parses_to(sql, sql) + } } diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index e2ff668e2dcab..3cc0e5d77701d 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -23,6 +23,7 @@ use sqlparser::ast::TableFactor; mod join; impl<'a, S: ContextProvider> SqlToRel<'a, S> { + /// Create a `LogicalPlan` that scans the named relation fn create_relation( &self, relation: TableFactor, diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 64c59eee87fd3..bd9ca3ffa83a2 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -16,7 +16,8 @@ // under the License. use crate::parser::{ - CreateExternalTable, DFParser, DescribeTableStmt, Statement as DFStatement, + CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt, + Statement as DFStatement, }; use crate::planner::{ object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel, @@ -85,6 +86,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { DFStatement::CreateExternalTable(s) => self.external_table_to_plan(s), DFStatement::Statement(s) => self.sql_statement_to_plan(*s), DFStatement::DescribeTableStmt(s) => self.describe_table_to_plan(s), + DFStatement::CopyTo(s) => self.copy_to_plan(s), } } @@ -537,6 +539,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { })) } + fn copy_to_plan(&self, _statement: CopyToStatement) -> Result { + // TODO: implement as part of https://github.com/apache/arrow-datafusion/issues/5654 + Err(DataFusionError::NotImplemented( + "`COPY .. TO ..` statement is not yet supported".to_string(), + )) + } + fn build_order_by( &self, order_exprs: Vec,