Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 29 additions & 20 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1698,30 +1698,39 @@ impl SessionState {
}

let mut visitor = RelationVisitor(&mut relations);
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(&mut visitor);
}
DFStatement::CreateExternalTable(table) => {
visitor
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
DFStatement::DescribeTableStmt(table) => visitor.insert(&table.table_name),
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
fn visit_statement(statement: &DFStatement, visitor: &mut RelationVisitor<'_>) {
match statement {
DFStatement::Statement(s) => {
let _ = s.as_ref().visit(visitor);
}
CopyToSource::Query(query) => {
query.visit(&mut visitor);
DFStatement::CreateExternalTable(table) => {
visitor
.0
.insert(ObjectName(vec![Ident::from(table.name.as_str())]));
}
},
DFStatement::DescribeTableStmt(table) => {
visitor.insert(&table.table_name)
}
DFStatement::CopyTo(CopyToStatement {
source,
target: _,
options: _,
}) => match source {
CopyToSource::Relation(table_name) => {
visitor.insert(table_name);
}
CopyToSource::Query(query) => {
query.visit(visitor);
}
},
DFStatement::Explain(explain) => {
visit_statement(&explain.statement, visitor)
}
}
}

visit_statement(statement, &mut visitor);

// Always include information_schema if available
if self.config.information_schema() {
for s in INFORMATION_SCHEMA_TABLES {
Expand Down
140 changes: 103 additions & 37 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,35 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
Ok(s.to_uppercase())
}

/// DataFusion specific EXPLAIN (needed so we can EXPLAIN datafusion
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key thing -- we need a datafusion specific EXPLAIN statement

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to keep these datafusion specific statements in general vs. moving this upstream to the sqlparser crate? This distinction was difficult for me to understand when first diving into how statements are parsed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the high level rationale is that sqlparser-rs intends to parse sql for one or more existing "standard" sql dialects (like MySQL or Postgres). However, these statements (COPY and CREATE EXTERNAL TABLE are DataFusion specific extensions). I will make a PR with some docs to to try and clarify this rationale

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR with some docs: #7318

/// specific COPY and other statements)
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExplainStatement {
pub analyze: bool,
pub verbose: bool,
pub statement: Box<Statement>,
}

impl fmt::Display for ExplainStatement {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let Self {
analyze,
verbose,
statement,
} = self;

write!(f, "EXPLAIN ")?;
if *analyze {
write!(f, "ANALYZE ")?;
}
if *verbose {
write!(f, "VERBOSE ")?;
}

write!(f, "{statement}")
}
}

/// DataFusion extension DDL for `COPY`
///
/// # Syntax:
Expand Down Expand Up @@ -74,7 +103,7 @@ pub struct CopyToStatement {
/// The URL to where the data is heading
pub target: String,
/// Target specific options
pub options: HashMap<String, Value>,
pub options: Vec<(String, Value)>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also changed the parser to preserve the order of the options so the output is consistent (rather than whatever order the hashmap decided). Without that the CI failed because the output order was different on windows than it was on Linux -- see this example

}

impl fmt::Display for CopyToStatement {
Expand All @@ -88,10 +117,8 @@ impl fmt::Display for CopyToStatement {
write!(f, "COPY {source} TO {target}")?;

if !options.is_empty() {
let mut opts: Vec<_> =
options.iter().map(|(k, v)| format!("{k} {v}")).collect();
let opts: Vec<_> = options.iter().map(|(k, v)| format!("{k} {v}")).collect();
// print them in sorted order
opts.sort_unstable();
write!(f, " ({})", opts.join(", "))?;
}

Expand Down Expand Up @@ -208,6 +235,8 @@ pub enum Statement {
DescribeTableStmt(DescribeTableStmt),
/// Extension: `COPY TO`
CopyTo(CopyToStatement),
/// EXPLAIN for extensions
Explain(ExplainStatement),
}

impl fmt::Display for Statement {
Expand All @@ -217,11 +246,12 @@ impl fmt::Display for Statement {
Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"),
Statement::DescribeTableStmt(_) => write!(f, "DESCRIBE TABLE ..."),
Statement::CopyTo(stmt) => write!(f, "{stmt}"),
Statement::Explain(stmt) => write!(f, "{stmt}"),
}
}
}

/// DataFusion SQL Parser based on [`sqlparser`]
/// Datafusion SQL Parser based on [`sqlparser`]
///
/// Parses DataFusion's SQL dialect, often delegating to [`sqlparser`]'s
/// [`Parser`](sqlparser::parser::Parser).
Expand Down Expand Up @@ -307,24 +337,24 @@ impl<'a> DFParser<'a> {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
// move one token forward
self.parser.next_token();
// use custom parsing
self.parser.next_token(); // CREATE
self.parse_create()
}
Keyword::COPY => {
// move one token forward
self.parser.next_token();
self.parser.next_token(); // COPY
self.parse_copy()
}
Keyword::DESCRIBE => {
// move one token forward
self.parser.next_token();
// use custom parsing
self.parser.next_token(); // DESCRIBE
self.parse_describe()
}
Keyword::EXPLAIN => {
// (TODO parse all supported statements)
self.parser.next_token(); // EXPLAIN
self.parse_explain()
}
_ => {
// use the native parser
// use sqlparser-rs parser
Ok(Statement::Statement(Box::from(
self.parser.parse_statement()?,
)))
Expand Down Expand Up @@ -369,7 +399,7 @@ impl<'a> DFParser<'a> {
let options = if self.parser.peek_token().token == Token::LParen {
self.parse_value_options()?
} else {
HashMap::new()
vec![]
};

Ok(Statement::CopyTo(CopyToStatement {
Expand Down Expand Up @@ -421,6 +451,19 @@ impl<'a> DFParser<'a> {
}
}

/// Parse a SQL `EXPLAIN`
pub fn parse_explain(&mut self) -> Result<Statement, ParserError> {
let analyze = self.parser.parse_keyword(Keyword::ANALYZE);
let verbose = self.parser.parse_keyword(Keyword::VERBOSE);
let statement = self.parse_statement()?;

Ok(Statement::Explain(ExplainStatement {
statement: Box::new(statement),
analyze,
verbose,
}))
}

/// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
if self.parser.parse_keyword(Keyword::EXTERNAL) {
Expand Down Expand Up @@ -758,14 +801,14 @@ impl<'a> DFParser<'a> {
/// Unlike [`Self::parse_string_options`], this method supports
/// keywords as key names as well as multiple value types such as
/// Numbers as well as Strings.
fn parse_value_options(&mut self) -> Result<HashMap<String, Value>, ParserError> {
let mut options = HashMap::new();
fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>, ParserError> {
let mut options = vec![];
self.parser.expect_token(&Token::LParen)?;

loop {
let key = self.parse_option_key()?;
let value = self.parse_option_value()?;
options.insert(key, value);
options.push((key, value));
let comma = self.parser.consume_token(&Token::Comma);
if self.parser.consume_token(&Token::RParen) {
// allow a trailing comma, even though it's not in standard
Expand Down Expand Up @@ -1285,13 +1328,39 @@ mod tests {
let expected = Statement::CopyTo(CopyToStatement {
source: object_name("foo"),
target: "bar".to_string(),
options: HashMap::new(),
options: vec![],
});

assert_eq!(verified_stmt(sql), expected);
Ok(())
}

#[test]
fn explain_copy_to_table_to_table() -> Result<(), ParserError> {
let cases = vec![
("EXPLAIN COPY foo TO bar", false, false),
("EXPLAIN ANALYZE COPY foo TO bar", true, false),
("EXPLAIN VERBOSE COPY foo TO bar", false, true),
("EXPLAIN ANALYZE VERBOSE COPY foo TO bar", true, true),
];
for (sql, analyze, verbose) in cases {
println!("sql: {sql}, analyze: {analyze}, verbose: {verbose}");

let expected_copy = Statement::CopyTo(CopyToStatement {
source: object_name("foo"),
target: "bar".to_string(),
options: vec![],
});
let expected = Statement::Explain(ExplainStatement {
analyze,
verbose,
statement: Box::new(expected_copy),
});
assert_eq!(verified_stmt(sql), expected);
}
Ok(())
}

#[test]
fn copy_to_query_to_table() -> Result<(), ParserError> {
let statement = verified_stmt("SELECT 1");
Expand All @@ -1313,7 +1382,7 @@ mod tests {
let expected = Statement::CopyTo(CopyToStatement {
source: CopyToSource::Query(query),
target: "bar".to_string(),
options: HashMap::new(),
options: vec![],
});
assert_eq!(verified_stmt(sql), expected);
Ok(())
Expand All @@ -1325,28 +1394,22 @@ mod tests {
let expected = Statement::CopyTo(CopyToStatement {
source: object_name("foo"),
target: "bar".to_string(),
options: HashMap::from([(
options: vec![(
"row_group_size".to_string(),
Value::Number("55".to_string(), false),
)]),
)],
});
assert_eq!(verified_stmt(sql), expected);
Ok(())
}

#[test]
fn copy_to_multi_options() -> Result<(), ParserError> {
// order of options is preserved
let sql =
"COPY foo TO bar (format parquet, row_group_size 55, compression snappy)";
// canonical order is alphabetical
let canonical =
"COPY foo TO bar (compression snappy, format parquet, row_group_size 55)";

let expected_options = HashMap::from([
(
"compression".to_string(),
Value::UnQuotedString("snappy".to_string()),
),
let expected_options = vec![
(
"format".to_string(),
Value::UnQuotedString("parquet".to_string()),
Expand All @@ -1355,14 +1418,17 @@ mod tests {
"row_group_size".to_string(),
Value::Number("55".to_string(), false),
),
]);
(
"compression".to_string(),
Value::UnQuotedString("snappy".to_string()),
),
];

let options =
if let Statement::CopyTo(copy_to) = one_statement_parses_to(sql, canonical) {
copy_to.options
} else {
panic!("Expected copy");
};
let options = if let Statement::CopyTo(copy_to) = verified_stmt(sql) {
copy_to.options
} else {
panic!("Expected copy");
};

assert_eq!(options, expected_options);

Expand Down
22 changes: 17 additions & 5 deletions datafusion/sql/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::parser::{
CopyToSource, CopyToStatement, CreateExternalTable, DFParser, DescribeTableStmt,
LexOrdering, Statement as DFStatement,
ExplainStatement, LexOrdering, Statement as DFStatement,
};
use crate::planner::{
object_name_to_qualifier, ContextProvider, PlannerContext, SqlToRel,
Expand Down Expand Up @@ -93,6 +93,11 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
DFStatement::Statement(s) => self.sql_statement_to_plan(*s),
DFStatement::DescribeTableStmt(s) => self.describe_table_to_plan(s),
DFStatement::CopyTo(s) => self.copy_to_plan(s),
DFStatement::Explain(ExplainStatement {
verbose,
analyze,
statement,
}) => self.explain_to_plan(verbose, analyze, *statement),
}
}

Expand All @@ -116,7 +121,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
format: _,
describe_alias: _,
..
} => self.explain_statement_to_plan(verbose, analyze, *statement),
} => {
self.explain_to_plan(verbose, analyze, DFStatement::Statement(statement))
}
Statement::Query(query) => self.query_to_plan(*query, planner_context),
Statement::ShowVariable { variable } => self.show_variable_to_plan(&variable),
Statement::SetVariable {
Expand Down Expand Up @@ -706,13 +713,18 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {

/// Generate a plan for EXPLAIN ... that will print out a plan
///
fn explain_statement_to_plan(
/// Note this is the sqlparser explain statement, not the
/// datafusion `EXPLAIN` statement.
fn explain_to_plan(
&self,
verbose: bool,
analyze: bool,
statement: Statement,
statement: DFStatement,
Comment on lines +718 to +722
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ExplainStatement as param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are two places this is called -- one from the DataFusion ExplainStatement (introduced in this PR) and one from the SQL parser Explain. So I thought it best to support them both and simply pass the parameters on through that are actually used

) -> Result<LogicalPlan> {
let plan = self.sql_statement_to_plan(statement)?;
let plan = self.statement_to_plan(statement)?;
if matches!(plan, LogicalPlan::Explain(_)) {
return plan_err!("Nested EXPLAINs are not supported");
}
let plan = Arc::new(plan);
let schema = LogicalPlan::explain_schema();
let schema = schema.to_dfschema_ref()?;
Expand Down
Loading