diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index 1917a9e8a5a..a601b7cafdd 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -178,7 +178,7 @@ impl FlightService for FlightServiceImpl { } } -fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { +fn to_tonic_err(e: &datafusion::error::DataFusionError) -> Status { Status::internal(format!("{:?}", e)) } diff --git a/rust/datafusion/src/datasource/csv.rs b/rust/datafusion/src/datasource/csv.rs index 45c1fbb0b1c..b0317c16d33 100644 --- a/rust/datafusion/src/datasource/csv.rs +++ b/rust/datafusion/src/datasource/csv.rs @@ -38,7 +38,7 @@ use std::string::String; use std::sync::Arc; use crate::datasource::TableProvider; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::csv::CsvExec; pub use crate::physical_plan::csv::CsvReadOptions; use crate::physical_plan::{common, ExecutionPlan}; @@ -62,7 +62,7 @@ impl CsvFile { let mut filenames: Vec = vec![]; common::build_file_list(path, &mut filenames, options.file_extension)?; if filenames.is_empty() { - return Err(ExecutionError::General("No files found".to_string())); + return Err(DataFusionError::Plan("No files found".to_string())); } CsvExec::try_infer_schema(&filenames, &options)? } diff --git a/rust/datafusion/src/datasource/memory.rs b/rust/datafusion/src/datasource/memory.rs index 28dc2a3da07..12db5b5e9e8 100644 --- a/rust/datafusion/src/datasource/memory.rs +++ b/rust/datafusion/src/datasource/memory.rs @@ -25,7 +25,7 @@ use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::record_batch::RecordBatch; use crate::datasource::TableProvider; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::common; use crate::physical_plan::memory::MemoryExec; use crate::physical_plan::ExecutionPlan; @@ -49,7 +49,7 @@ impl MemTable { batches: partitions, }) } else { - Err(ExecutionError::General( + Err(DataFusionError::Plan( "Mismatch between schema and batches".to_string(), )) } @@ -112,7 +112,7 @@ impl TableProvider for MemTable { if *i < self.schema.fields().len() { Ok(self.schema.field(*i).clone()) } else { - Err(ExecutionError::General( + Err(DataFusionError::Internal( "Projection index out of range".to_string(), )) } @@ -217,7 +217,7 @@ mod tests { let projection: Vec = vec![0, 4]; match provider.scan(&Some(projection), 1024) { - Err(ExecutionError::General(e)) => { + Err(DataFusionError::Internal(e)) => { assert_eq!("\"Projection index out of range\"", format!("{:?}", e)) } _ => assert!(false, "Scan should failed on invalid projection"), @@ -250,7 +250,7 @@ mod tests { )?; match MemTable::new(schema2, vec![vec![batch]]) { - Err(ExecutionError::General(e)) => assert_eq!( + Err(DataFusionError::Plan(e)) => assert_eq!( "\"Mismatch between schema and batches\"", format!("{:?}", e) ), diff --git a/rust/datafusion/src/error.rs b/rust/datafusion/src/error.rs index 6f903025dee..b4c8dcc026b 100644 --- a/rust/datafusion/src/error.rs +++ b/rust/datafusion/src/error.rs @@ -19,109 +19,102 @@ use std::error; use std::fmt::{Display, Formatter}; -use std::io::Error; +use std::io; use std::result; use arrow::error::ArrowError; use parquet::errors::ParquetError; use sqlparser::parser::ParserError; -/// Result type for operations that could result in an `ExecutionError` -pub type Result = result::Result; +/// Result type for operations that could result in an [DataFusionError] +pub type Result = result::Result; /// DataFusion error #[derive(Debug)] #[allow(missing_docs)] -pub enum ExecutionError { - /// Wraps an error from the Arrow crate +pub enum DataFusionError { + /// Error returned by arrow. ArrowError(ArrowError), /// Wraps an error from the Parquet crate ParquetError(ParquetError), - /// I/O error - IoError(Error), - /// SQL parser error - ParserError(ParserError), - /// General error - General(String), - /// Invalid column error - InvalidColumn(String), - /// Missing functionality + /// Error associated to I/O operations and associated traits. + IoError(io::Error), + /// Error returned when SQL is syntatically incorrect. + SQL(ParserError), + /// Error returned on a branch that we know it is possible + /// but to which we still have no implementation for. + /// Often, these errors are tracked in our issue tracker. NotImplemented(String), - /// Internal error - InternalError(String), - /// Query engine execution error - ExecutionError(String), + /// Error returned as a consequence of an error in DataFusion. + /// This error should not happen in normal usage of DataFusion. + // DataFusions has internal invariants that we are unable to ask the compiler to check for us. + // This error is raised when one of those invariants is not verified during execution. + Internal(String), + /// This error happens whenever a plan is not valid. Examples include + /// impossible casts, schema inference not possible and non-unique column names. + Plan(String), + /// Error returned during execution of the query. + /// Examples include files not found, errors in parsing certain types. + Execution(String), } -impl ExecutionError { - /// Wraps this `ExecutionError` in arrow's `ExternalError` variant. +impl DataFusionError { + /// Wraps this [DataFusionError] as an [Arrow::error::ArrowError]. pub fn into_arrow_external_error(self) -> ArrowError { ArrowError::from_external_error(Box::new(self)) } } -impl From for ExecutionError { - fn from(e: Error) -> Self { - ExecutionError::IoError(e) +impl From for DataFusionError { + fn from(e: io::Error) -> Self { + DataFusionError::IoError(e) } } -impl From for ExecutionError { - fn from(e: String) -> Self { - ExecutionError::General(e) - } -} - -impl From<&'static str> for ExecutionError { - fn from(e: &'static str) -> Self { - ExecutionError::General(e.to_string()) - } -} - -impl From for ExecutionError { +impl From for DataFusionError { fn from(e: ArrowError) -> Self { - ExecutionError::ArrowError(e) + DataFusionError::ArrowError(e) } } -impl From for ExecutionError { +impl From for DataFusionError { fn from(e: ParquetError) -> Self { - ExecutionError::ParquetError(e) + DataFusionError::ParquetError(e) } } -impl From for ExecutionError { +impl From for DataFusionError { fn from(e: ParserError) -> Self { - ExecutionError::ParserError(e) + DataFusionError::SQL(e) } } -impl Display for ExecutionError { +impl Display for DataFusionError { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match *self { - ExecutionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc), - ExecutionError::ParquetError(ref desc) => { + DataFusionError::ArrowError(ref desc) => write!(f, "Arrow error: {}", desc), + DataFusionError::ParquetError(ref desc) => { write!(f, "Parquet error: {}", desc) } - ExecutionError::IoError(ref desc) => write!(f, "IO error: {}", desc), - ExecutionError::ParserError(ref desc) => { - write!(f, "Parser error: {:?}", desc) + DataFusionError::IoError(ref desc) => write!(f, "IO error: {}", desc), + DataFusionError::SQL(ref desc) => { + write!(f, "SQL error: {:?}", desc) } - ExecutionError::General(ref desc) => write!(f, "General error: {}", desc), - ExecutionError::InvalidColumn(ref desc) => { - write!(f, "Invalid column error: {}", desc) + DataFusionError::NotImplemented(ref desc) => { + write!(f, "This feature is not implemented: {}", desc) } - ExecutionError::NotImplemented(ref desc) => { - write!(f, "NotImplemented: {}", desc) + DataFusionError::Internal(ref desc) => { + write!(f, "Internal error: {}. This was likely caused by a bug in DataFusion's \ + code and we would welcome that you file an bug report in our issue tracker", desc) } - ExecutionError::InternalError(ref desc) => { - write!(f, "Internal error: {}", desc) + DataFusionError::Plan(ref desc) => { + write!(f, "Error during planning: {}", desc) } - ExecutionError::ExecutionError(ref desc) => { + DataFusionError::Execution(ref desc) => { write!(f, "Execution error: {}", desc) } } } } -impl error::Error for ExecutionError {} +impl error::Error for DataFusionError {} diff --git a/rust/datafusion/src/execution/context.rs b/rust/datafusion/src/execution/context.rs index ce7c15975ef..0e2425dbec9 100644 --- a/rust/datafusion/src/execution/context.rs +++ b/rust/datafusion/src/execution/context.rs @@ -32,7 +32,7 @@ use arrow::record_batch::RecordBatch; use crate::datasource::csv::CsvFile; use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::execution::dataframe_impl::DataFrameImpl; use crate::logical_plan::{FunctionRegistry, LogicalPlan, LogicalPlanBuilder}; use crate::optimizer::filter_push_down::FilterPushDown; @@ -148,7 +148,7 @@ impl ExecutionContext { let plan = LogicalPlanBuilder::empty().build()?; Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan))) } - _ => Err(ExecutionError::ExecutionError(format!( + _ => Err(DataFusionError::NotImplemented(format!( "Unsupported file type {:?}.", file_type ))), @@ -164,7 +164,7 @@ impl ExecutionContext { let statements = DFParser::parse_sql(sql)?; if statements.len() != 1 { - return Err(ExecutionError::NotImplemented(format!( + return Err(DataFusionError::NotImplemented(format!( "The context currently only supports a single SQL statement", ))); } @@ -288,7 +288,7 @@ impl ExecutionContext { &LogicalPlanBuilder::from(&table_scan).build()?, ))) } - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::Plan(format!( "No table named '{}'", table_name ))), @@ -364,7 +364,7 @@ impl ExecutionContext { .map(|batch| writer.write(&batch?)) .try_collect() .await - .map_err(|e| ExecutionError::from(e))?; + .map_err(|e| DataFusionError::from(e))?; } Ok(()) } @@ -501,7 +501,7 @@ impl FunctionRegistry for ExecutionContextState { fn udf(&self, name: &str) -> Result<&ScalarUDF> { let result = self.scalar_functions.get(name); if result.is_none() { - Err(ExecutionError::General( + Err(DataFusionError::Plan( format!("There is no UDF named \"{}\" in the registry", name).to_string(), )) } else { @@ -512,7 +512,7 @@ impl FunctionRegistry for ExecutionContextState { fn udaf(&self, name: &str) -> Result<&AggregateUDF> { let result = self.aggregate_functions.get(name); if result.is_none() { - Err(ExecutionError::General( + Err(DataFusionError::Plan( format!("There is no UDAF named \"{}\" in the registry", name) .to_string(), )) @@ -1414,7 +1414,7 @@ mod tests { _logical_plan: &LogicalPlan, _ctx_state: &ExecutionContextState, ) -> Result> { - Err(ExecutionError::NotImplemented( + Err(DataFusionError::NotImplemented( "query not supported".to_string(), )) } diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index 6df92fe190e..e1bf5fdb1e0 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -32,7 +32,7 @@ use arrow::{ use crate::datasource::parquet::ParquetTable; use crate::datasource::TableProvider; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::{ datasource::csv::{CsvFile, CsvReadOptions}, physical_plan::udaf::AggregateUDF, @@ -115,7 +115,7 @@ fn create_name(e: &Expr, input_schema: &Schema) -> Result { } Ok(format!("{}({})", fun.name, names.join(","))) } - other => Err(ExecutionError::NotImplemented(format!( + other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other ))), @@ -275,7 +275,7 @@ impl Expr { &right.get_type(schema)?, ), Expr::Sort { ref expr, .. } => expr.get_type(schema), - Expr::Wildcard => Err(ExecutionError::General( + Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), Expr::Nested(e) => e.get_type(schema), @@ -309,7 +309,7 @@ impl Expr { } => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?), Expr::Sort { ref expr, .. } => expr.nullable(input_schema), Expr::Nested(e) => e.nullable(input_schema), - Expr::Wildcard => Err(ExecutionError::General( + Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), } @@ -347,7 +347,7 @@ impl Expr { data_type: cast_to_type.clone(), }) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Plan(format!( "Cannot automatically convert {:?} to {:?}", this_type, cast_to_type ))) @@ -1270,7 +1270,7 @@ fn validate_unique_names( Ok(()) }, Some((existing_position, existing_expr)) => { - Err(ExecutionError::General( + Err(DataFusionError::Plan( format!("{} require unique expression names \ but the expression \"{:?}\" at position {} and \"{:?}\" \ at position {} have the same name. Consider aliasing (\"AS\") one of them.", @@ -1445,14 +1445,14 @@ mod tests { .project(vec![col("id"), col("first_name").alias("id")]); match plan { - Err(ExecutionError::General(e)) => { + Err(DataFusionError::Plan(e)) => { assert_eq!(e, "Projections require unique expression names \ but the expression \"#id\" at position 0 and \"#first_name AS id\" at \ position 1 have the same name. Consider aliasing (\"AS\") one of them."); Ok(()) } - _ => Err(ExecutionError::General( - "Plan should have returned an ExecutionError::General".to_string(), + _ => Err(DataFusionError::Plan( + "Plan should have returned an DataFusionError::Plan".to_string(), )), } } @@ -1469,14 +1469,14 @@ mod tests { .aggregate(vec![col("state")], vec![sum(col("salary")).alias("state")]); match plan { - Err(ExecutionError::General(e)) => { + Err(DataFusionError::Plan(e)) => { assert_eq!(e, "Aggregations require unique expression names \ but the expression \"#state\" at position 0 and \"SUM(#salary) AS state\" at \ position 1 have the same name. Consider aliasing (\"AS\") one of them."); Ok(()) } - _ => Err(ExecutionError::General( - "Plan should have returned an ExecutionError::General".to_string(), + _ => Err(DataFusionError::Plan( + "Plan should have returned an DataFusionError::Plan".to_string(), )), } } diff --git a/rust/datafusion/src/optimizer/projection_push_down.rs b/rust/datafusion/src/optimizer/projection_push_down.rs index 0bd46ee235c..ce4ebd38c7a 100644 --- a/rust/datafusion/src/optimizer/projection_push_down.rs +++ b/rust/datafusion/src/optimizer/projection_push_down.rs @@ -18,7 +18,7 @@ //! Projection Push Down optimizer rule ensures that only referenced columns are //! loaded into memory -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::LogicalPlan; use crate::optimizer::optimizer::OptimizerRule; use crate::optimizer::utils; @@ -62,7 +62,7 @@ fn get_projected_schema( has_projection: bool, ) -> Result<(Vec, SchemaRef)> { if projection.is_some() { - return Err(ExecutionError::General( + return Err(DataFusionError::Internal( "Cannot run projection push-down rule more than once".to_string(), )); } diff --git a/rust/datafusion/src/optimizer/utils.rs b/rust/datafusion/src/optimizer/utils.rs index d45c18ac27d..3b83fcbd939 100644 --- a/rust/datafusion/src/optimizer/utils.rs +++ b/rust/datafusion/src/optimizer/utils.rs @@ -22,7 +22,7 @@ use std::{collections::HashSet, sync::Arc}; use arrow::datatypes::{Schema, SchemaRef}; use super::optimizer::OptimizerRule; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::{Expr, LogicalPlan, PlanType, StringifiedPlan}; /// Recursively walk a list of expression trees, collecting the unique set of column @@ -68,7 +68,7 @@ pub fn expr_to_column_names(expr: &Expr, accum: &mut HashSet) -> Result< Expr::AggregateUDF { args, .. } => exprlist_to_column_names(args, accum), Expr::ScalarFunction { args, .. } => exprlist_to_column_names(args, accum), Expr::ScalarUDF { args, .. } => exprlist_to_column_names(args, accum), - Expr::Wildcard => Err(ExecutionError::General( + Expr::Wildcard => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), Expr::Nested(e) => expr_to_column_names(e, accum), @@ -215,7 +215,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result> { Expr::ScalarVariable(_) => Ok(vec![]), Expr::Not(expr) => Ok(vec![expr]), Expr::Sort { expr, .. } => Ok(vec![expr]), - Expr::Wildcard { .. } => Err(ExecutionError::General( + Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), Expr::Nested(expr) => Ok(vec![expr]), @@ -268,7 +268,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &Vec) -> Result asc: asc.clone(), nulls_first: nulls_first.clone(), }), - Expr::Wildcard { .. } => Err(ExecutionError::General( + Expr::Wildcard { .. } => Err(DataFusionError::Internal( "Wildcard expressions are not valid in a logical query plan".to_owned(), )), Expr::Nested(_) => Ok(Expr::Nested(Box::new(expressions[0].clone()))), diff --git a/rust/datafusion/src/physical_plan/aggregates.rs b/rust/datafusion/src/physical_plan/aggregates.rs index d417c41855d..eba8c7552a4 100644 --- a/rust/datafusion/src/physical_plan/aggregates.rs +++ b/rust/datafusion/src/physical_plan/aggregates.rs @@ -31,7 +31,7 @@ use super::{ type_coercion::{coerce, data_types}, Accumulator, AggregateExpr, PhysicalExpr, }; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::distinct_expressions; use crate::physical_plan::expressions; use arrow::datatypes::{DataType, Schema}; @@ -70,7 +70,7 @@ impl fmt::Display for AggregateFunction { } impl FromStr for AggregateFunction { - type Err = ExecutionError; + type Err = DataFusionError; fn from_str(name: &str) -> Result { Ok(match &*name.to_uppercase() { "MIN" => AggregateFunction::Min, @@ -79,7 +79,7 @@ impl FromStr for AggregateFunction { "AVG" => AggregateFunction::Avg, "SUM" => AggregateFunction::Sum, _ => { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Plan(format!( "There is no built-in function named {}", name ))) @@ -142,7 +142,7 @@ pub fn create_aggregate_expr( Arc::new(expressions::Sum::new(arg, name, return_type)) } (AggregateFunction::Sum, true) => { - return Err(ExecutionError::NotImplemented( + return Err(DataFusionError::NotImplemented( "SUM(DISTINCT) aggregations are not available".to_string(), )); } @@ -156,7 +156,7 @@ pub fn create_aggregate_expr( Arc::new(expressions::Avg::new(arg, name, return_type)) } (AggregateFunction::Avg, true) => { - return Err(ExecutionError::NotImplemented( + return Err(DataFusionError::NotImplemented( "AVG(DISTINCT) aggregations are not available".to_string(), )); } diff --git a/rust/datafusion/src/physical_plan/array_expressions.rs b/rust/datafusion/src/physical_plan/array_expressions.rs index 79fb64e795a..528cbb3da8d 100644 --- a/rust/datafusion/src/physical_plan/array_expressions.rs +++ b/rust/datafusion/src/physical_plan/array_expressions.rs @@ -17,7 +17,7 @@ //! Array expressions -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use arrow::array::*; use arrow::datatypes::DataType; use std::sync::Arc; @@ -28,7 +28,7 @@ macro_rules! downcast_vec { .iter() .map(|e| match e.as_any().downcast_ref::<$ARRAY_TYPE>() { Some(array) => Ok(array), - _ => Err(ExecutionError::General("failed to downcast".to_string())), + _ => Err(DataFusionError::Internal("failed to downcast".to_string())), }) }}; } @@ -62,7 +62,7 @@ macro_rules! array { pub fn array(args: &[ArrayRef]) -> Result { // do not accept 0 arguments. if args.len() == 0 { - return Err(ExecutionError::InternalError( + return Err(DataFusionError::Internal( "array requires at least one argument".to_string(), )); } @@ -81,7 +81,7 @@ pub fn array(args: &[ArrayRef]) -> Result { DataType::UInt16 => array!(args, UInt16Array, UInt16Builder), DataType::UInt32 => array!(args, UInt32Array, UInt32Builder), DataType::UInt64 => array!(args, UInt64Array, UInt64Builder), - data_type => Err(ExecutionError::NotImplemented(format!( + data_type => Err(DataFusionError::NotImplemented(format!( "Array is not implemented for type '{:?}'.", data_type ))), diff --git a/rust/datafusion/src/physical_plan/common.rs b/rust/datafusion/src/physical_plan/common.rs index 8bf9bafb457..7c4f2179024 100644 --- a/rust/datafusion/src/physical_plan/common.rs +++ b/rust/datafusion/src/physical_plan/common.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use array::{ BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, @@ -84,7 +84,7 @@ pub async fn collect(stream: SendableRecordBatchStream) -> Result>() .await - .map_err(|e| ExecutionError::from(e)) + .map_err(|e| DataFusionError::from(e)) } /// Recursively build a list of files in a directory with a given extension @@ -107,7 +107,7 @@ pub fn build_file_list(dir: &str, filenames: &mut Vec, ext: &str) -> Res } } } else { - return Err(ExecutionError::General("Invalid path".to_string())); + return Err(DataFusionError::Plan("Invalid path".to_string())); } } } @@ -159,12 +159,12 @@ pub fn create_batch_empty(schema: &Schema) -> ArrowResult { DataType::Boolean => { Ok(Arc::new(BooleanArray::from(vec![] as Vec)) as ArrayRef) } - _ => Err(ExecutionError::NotImplemented(format!( + _ => Err(DataFusionError::NotImplemented(format!( "Cannot convert datatype {:?} to array", f.data_type() ))), }) .collect::>() - .map_err(ExecutionError::into_arrow_external_error)?; + .map_err(DataFusionError::into_arrow_external_error)?; RecordBatch::try_new(Arc::new(schema.to_owned()), columns) } diff --git a/rust/datafusion/src/physical_plan/csv.rs b/rust/datafusion/src/physical_plan/csv.rs index c94f0407faf..ba40ddd94b3 100644 --- a/rust/datafusion/src/physical_plan/csv.rs +++ b/rust/datafusion/src/physical_plan/csv.rs @@ -23,7 +23,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{common, Partitioning}; use arrow::csv; @@ -145,7 +145,7 @@ impl CsvExec { let mut filenames: Vec = vec![]; common::build_file_list(path, &mut filenames, file_extension.as_str())?; if filenames.is_empty() { - return Err(ExecutionError::General("No files found".to_string())); + return Err(DataFusionError::Execution("No files found".to_string())); } let schema = match options.schema { @@ -214,7 +214,7 @@ impl ExecutionPlan for CsvExec { if children.is_empty() { Ok(Arc::new(self.clone())) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Children cannot be replaced in {:?}", self ))) diff --git a/rust/datafusion/src/physical_plan/datetime_expressions.rs b/rust/datafusion/src/physical_plan/datetime_expressions.rs index 38266749cf8..9eba91fdff5 100644 --- a/rust/datafusion/src/physical_plan/datetime_expressions.rs +++ b/rust/datafusion/src/physical_plan/datetime_expressions.rs @@ -19,7 +19,7 @@ use std::sync::Arc; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use arrow::{ array::{Array, ArrayData, ArrayRef, StringArray, TimestampNanosecondArray}, buffer::Buffer, @@ -136,7 +136,7 @@ fn string_to_timestamp_nanos(s: &str) -> Result { // strings and we don't know which the user was trying to // match. Ths any of the specific error messages is likely to be // be more confusing than helpful - Err(ExecutionError::General(format!( + Err(DataFusionError::Execution(format!( "Error parsing '{}' as timestamp", s ))) @@ -148,7 +148,7 @@ fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result let l = Local {}; match l.from_local_datetime(&datetime) { - LocalResult::None => Err(ExecutionError::General(format!( + LocalResult::None => Err(DataFusionError::Execution(format!( "Error parsing '{}' as timestamp: local time representation is invalid", s ))), @@ -174,8 +174,8 @@ pub fn to_timestamp(args: &[ArrayRef]) -> Result { .as_any() .downcast_ref::() .ok_or_else(|| { - ExecutionError::General(format!( - "Internal error: could not cast to_timestamp input to StringArray" + DataFusionError::Internal(format!( + "could not cast to_timestamp input to StringArray" )) })?; diff --git a/rust/datafusion/src/physical_plan/distinct_expressions.rs b/rust/datafusion/src/physical_plan/distinct_expressions.rs index cc771078609..c2183ca3b66 100644 --- a/rust/datafusion/src/physical_plan/distinct_expressions.rs +++ b/rust/datafusion/src/physical_plan/distinct_expressions.rs @@ -26,7 +26,7 @@ use arrow::datatypes::{DataType, Field}; use fnv::FnvHashSet; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::group_scalar::GroupByScalar; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; @@ -131,7 +131,7 @@ impl Accumulator for DistinctCountAccumulator { .iter() .map(|state| match state { ScalarValue::List(Some(values), _) => Ok(values), - _ => Err(ExecutionError::InternalError( + _ => Err(DataFusionError::Internal( "Unexpected accumulator state".to_string(), )), }) @@ -178,7 +178,7 @@ impl Accumulator for DistinctCountAccumulator { match &self.count_data_type { DataType::UInt64 => Ok(ScalarValue::UInt64(Some(self.values.len() as u64))), t => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "Invalid data type {:?} for count distinct aggregation", t ))) diff --git a/rust/datafusion/src/physical_plan/empty.rs b/rust/datafusion/src/physical_plan/empty.rs index 0d96479da90..ecdbeba9eea 100644 --- a/rust/datafusion/src/physical_plan/empty.rs +++ b/rust/datafusion/src/physical_plan/empty.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::memory::MemoryStream; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning}; use arrow::datatypes::SchemaRef; @@ -72,7 +72,7 @@ impl ExecutionPlan for EmptyExec { ) -> Result> { match children.len() { 0 => Ok(Arc::new(EmptyExec::new(self.schema.clone()))), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "EmptyExec wrong number of children".to_string(), )), } @@ -81,7 +81,7 @@ impl ExecutionPlan for EmptyExec { async fn execute(&self, partition: usize) -> Result { // GlobalLimitExec has a single output partition if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "EmptyExec invalid partition {} (expected 0)", partition ))); diff --git a/rust/datafusion/src/physical_plan/explain.rs b/rust/datafusion/src/physical_plan/explain.rs index 4a46ad65ec3..40cd7065ba2 100644 --- a/rust/datafusion/src/physical_plan/explain.rs +++ b/rust/datafusion/src/physical_plan/explain.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::{ logical_plan::StringifiedPlan, physical_plan::{common::SizedRecordBatchStream, ExecutionPlan}, @@ -82,7 +82,7 @@ impl ExecutionPlan for ExplainExec { if children.is_empty() { Ok(Arc::new(self.clone())) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Children cannot be replaced in {:?}", self ))) @@ -91,7 +91,7 @@ impl ExecutionPlan for ExplainExec { async fn execute(&self, partition: usize) -> Result { if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "ExplainExec invalid partition {}", partition ))); diff --git a/rust/datafusion/src/physical_plan/expressions.rs b/rust/datafusion/src/physical_plan/expressions.rs index e9bbe193ff3..3240fe906a1 100644 --- a/rust/datafusion/src/physical_plan/expressions.rs +++ b/rust/datafusion/src/physical_plan/expressions.rs @@ -21,7 +21,7 @@ use std::convert::TryFrom; use std::fmt; use std::sync::Arc; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::logical_plan::Operator; use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr}; use crate::scalar::ScalarValue; @@ -122,7 +122,7 @@ pub fn sum_return_type(arg_type: &DataType) -> Result { } DataType::Float32 => Ok(DataType::Float32), DataType::Float64 => Ok(DataType::Float64), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Plan(format!( "SUM does not support type \"{:?}\"", other ))), @@ -204,7 +204,7 @@ fn sum_batch(values: &ArrayRef) -> Result { DataType::UInt16 => typed_sum_delta_batch!(values, UInt16Array, UInt16), DataType::UInt8 => typed_sum_delta_batch!(values, UInt8Array, UInt8), e => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "Sum is not expected to receive the type {:?}", e ))) @@ -288,7 +288,7 @@ fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result { typed_sum!(lhs, rhs, Int64, i64) } e => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "Sum is not expected to receive a scalar {:?}", e ))) @@ -350,7 +350,7 @@ pub fn avg_return_type(arg_type: &DataType) -> Result { | DataType::UInt64 | DataType::Float32 | DataType::Float64 => Ok(DataType::Float64), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Plan(format!( "AVG does not support {:?}", other ))), @@ -470,7 +470,7 @@ impl Accumulator for AvgAccumulator { Some(f) => Some(f / self.count as f64), None => None, })), - _ => Err(ExecutionError::InternalError( + _ => Err(DataFusionError::Internal( "Sum should be f64 on average".to_string(), )), } @@ -564,10 +564,11 @@ macro_rules! min_max_batch { DataType::UInt16 => typed_min_max_batch!($VALUES, UInt16Array, UInt16, $OP), DataType::UInt8 => typed_min_max_batch!($VALUES, UInt8Array, UInt8, $OP), other => { - return Err(ExecutionError::NotImplemented(format!( + // This should have been handled before + return Err(DataFusionError::Internal(format!( "Min/Max accumulator not implemented for type {:?}", other - ))) + ))); } } }}; @@ -664,7 +665,7 @@ macro_rules! min_max { typed_min_max_string!(lhs, rhs, LargeUtf8, $OP) } e => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "MIN/MAX is not expected to receive a scalar {:?}", e ))) @@ -888,7 +889,7 @@ impl CountAccumulator { (ScalarValue::UInt64(Some(lhs)), None) => Some(lhs.clone()), (ScalarValue::UInt64(Some(lhs)), Some(rhs)) => Some(lhs + rhs), _ => { - return Err(ExecutionError::InternalError( + return Err(DataFusionError::Internal( "Code should not be reached reach".to_string(), )) } @@ -916,7 +917,7 @@ impl Accumulator for CountAccumulator { // value is null => no change in count (e, true) => e.clone(), (_, false) => { - return Err(ExecutionError::InternalError( + return Err(DataFusionError::Internal( "Count is always of type u64".to_string(), )) } @@ -982,7 +983,7 @@ macro_rules! binary_string_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { DataType::Utf8 => compute_utf8_op!($LEFT, $RIGHT, $OP, StringArray), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?}", other ))), @@ -1006,7 +1007,7 @@ macro_rules! binary_primitive_array_op { DataType::UInt64 => compute_op!($LEFT, $RIGHT, $OP, UInt64Array), DataType::Float32 => compute_op!($LEFT, $RIGHT, $OP, Float32Array), DataType::Float64 => compute_op!($LEFT, $RIGHT, $OP, Float64Array), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?}", other ))), @@ -1033,7 +1034,7 @@ macro_rules! binary_array_op { DataType::Timestamp(TimeUnit::Nanosecond, None) => { compute_op!($LEFT, $RIGHT, $OP, TimestampNanosecondArray) } - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?}", other ))), @@ -1229,7 +1230,7 @@ fn common_binary_type( numerical_coercion(lhs_type, rhs_type) } Operator::Modulus => { - return Err(ExecutionError::NotImplemented( + return Err(DataFusionError::NotImplemented( "Modulus operator is still not supported".to_string(), )) } @@ -1237,7 +1238,7 @@ fn common_binary_type( // re-write the error message of failed coercions to include the operator's information match result { - None => Err(ExecutionError::General( + None => Err(DataFusionError::Plan( format!( "'{:?} {} {:?}' can't be evaluated because there isn't a common type to coerce the types to", lhs_type, op, rhs_type @@ -1277,7 +1278,7 @@ pub fn binary_operator_data_type( Operator::Plus | Operator::Minus | Operator::Divide | Operator::Multiply => { Ok(common_type) } - Operator::Modulus => Err(ExecutionError::NotImplemented( + Operator::Modulus => Err(DataFusionError::NotImplemented( "Modulus operator is still not supported".to_string(), )), } @@ -1319,7 +1320,8 @@ impl PhysicalExpr for BinaryExpr { let left = self.left.evaluate(batch)?; let right = self.right.evaluate(batch)?; if left.data_type() != right.data_type() { - return Err(ExecutionError::General(format!( + // this should have been captured during planning + return Err(DataFusionError::Internal(format!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", self.op, left.data_type(), @@ -1343,7 +1345,7 @@ impl PhysicalExpr for BinaryExpr { if left.data_type() == &DataType::Boolean { boolean_op!(left, right, and) } else { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", self.op, left.data_type(), @@ -1355,7 +1357,7 @@ impl PhysicalExpr for BinaryExpr { if left.data_type() == &DataType::Boolean { boolean_op!(left, right, or) } else { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "Cannot evaluate binary expression {:?} with types {:?} and {:?}", self.op, left.data_type(), @@ -1363,7 +1365,7 @@ impl PhysicalExpr for BinaryExpr { ))); } } - Operator::Modulus => Err(ExecutionError::NotImplemented( + Operator::Modulus => Err(DataFusionError::NotImplemented( "Modulus operator is still not supported".to_string(), )), } @@ -1431,7 +1433,7 @@ pub fn not( ) -> Result> { let data_type = arg.data_type(input_schema)?; if data_type != DataType::Boolean { - Err(ExecutionError::General( + Err(DataFusionError::Internal( format!( "NOT '{:?}' can't be evaluated because the expression's type is {:?}, not boolean", arg, data_type, @@ -1574,7 +1576,7 @@ pub fn cast( } else if can_cast_types(&expr_type, &cast_type) { Ok(Arc::new(CastExpr { expr, cast_type })) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Unsupported CAST from {:?} to {:?}", expr_type, cast_type ))) @@ -1662,7 +1664,7 @@ impl PhysicalExpr for Literal { StringBuilder, value.as_ref().and_then(|e| Some(&*e)) ), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Unsupported literal type {:?}", other ))), @@ -2021,12 +2023,12 @@ mod tests { let expr = common_binary_type(&DataType::Float32, &Operator::Plus, &DataType::Utf8); - if let Err(ExecutionError::General(e)) = expr { + if let Err(DataFusionError::Plan(e)) = expr { assert_eq!(e, "'Float32 + Utf8' can't be evaluated because there isn't a common type to coerce the types to"); Ok(()) } else { - Err(ExecutionError::General( - "Coercion should have returned an ExecutionError::General".to_string(), + Err(DataFusionError::Internal( + "Coercion should have returned an DataFusionError::Internal".to_string(), )) } } diff --git a/rust/datafusion/src/physical_plan/filter.rs b/rust/datafusion/src/physical_plan/filter.rs index e4ea3df00a0..4a61d7d9dac 100644 --- a/rust/datafusion/src/physical_plan/filter.rs +++ b/rust/datafusion/src/physical_plan/filter.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; use arrow::array::BooleanArray; use arrow::compute::filter; @@ -57,7 +57,7 @@ impl FilterExec { predicate: predicate.clone(), input: input.clone(), }), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Plan(format!( "Filter predicate must return boolean values, not {:?}", other ))), @@ -96,7 +96,7 @@ impl ExecutionPlan for FilterExec { self.predicate.clone(), children[0].clone(), )?)), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "FilterExec wrong number of children".to_string(), )), } @@ -128,13 +128,13 @@ fn batch_filter( ) -> ArrowResult { predicate .evaluate(&batch) - .map_err(ExecutionError::into_arrow_external_error) + .map_err(DataFusionError::into_arrow_external_error) .and_then(|array| { array .as_any() .downcast_ref::() .ok_or( - ExecutionError::InternalError( + DataFusionError::Internal( "Filter predicate evaluated to non-boolean value".to_string(), ) .into_arrow_external_error(), diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index 7b2fd593519..848184f2f97 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -33,7 +33,7 @@ use super::{ type_coercion::{coerce, data_types}, PhysicalExpr, }; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::array_expressions; use crate::physical_plan::datetime_expressions; use crate::physical_plan::math_expressions; @@ -131,7 +131,7 @@ impl fmt::Display for BuiltinScalarFunction { } impl FromStr for BuiltinScalarFunction { - type Err = ExecutionError; + type Err = DataFusionError; fn from_str(name: &str) -> Result { Ok(match name { "sqrt" => BuiltinScalarFunction::Sqrt, @@ -156,7 +156,7 @@ impl FromStr for BuiltinScalarFunction { "to_timestamp" => BuiltinScalarFunction::ToTimestamp, "array" => BuiltinScalarFunction::Array, _ => { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Plan(format!( "There is no built-in function named {}", name ))) @@ -179,7 +179,7 @@ pub fn return_type( if arg_types.len() == 0 { // functions currently cannot be evaluated without arguments, as they can't // know the number of rows to return. - return Err(ExecutionError::General( + return Err(DataFusionError::Plan( format!("Function '{}' requires at least one argument", fun).to_string(), )); } @@ -193,7 +193,7 @@ pub fn return_type( DataType::Utf8 => DataType::Int32, _ => { // this error is internal as `data_types` should have captured this. - return Err(ExecutionError::InternalError( + return Err(DataFusionError::Internal( "The length function can only accept strings.".to_string(), )); } @@ -446,7 +446,7 @@ mod tests { fn test_concat_error() -> Result<()> { let result = return_type(&BuiltinScalarFunction::Concat, &vec![]); if let Ok(_) = result { - Err(ExecutionError::General( + Err(DataFusionError::Plan( "Function 'concat' cannot accept zero arguments".to_string(), )) } else { diff --git a/rust/datafusion/src/physical_plan/group_scalar.rs b/rust/datafusion/src/physical_plan/group_scalar.rs index 6647df946c9..bb1e204c7f5 100644 --- a/rust/datafusion/src/physical_plan/group_scalar.rs +++ b/rust/datafusion/src/physical_plan/group_scalar.rs @@ -19,7 +19,7 @@ use std::convert::{From, TryFrom}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::scalar::ScalarValue; /// Enumeration of types that can be used in a GROUP BY expression (all primitives except @@ -38,7 +38,7 @@ pub(crate) enum GroupByScalar { } impl TryFrom<&ScalarValue> for GroupByScalar { - type Error = ExecutionError; + type Error = DataFusionError; fn try_from(scalar_value: &ScalarValue) -> Result { Ok(match scalar_value { @@ -60,13 +60,13 @@ impl TryFrom<&ScalarValue> for GroupByScalar { | ScalarValue::UInt32(None) | ScalarValue::UInt64(None) | ScalarValue::Utf8(None) => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "Cannot convert a ScalarValue holding NULL ({:?})", scalar_value ))); } v => { - return Err(ExecutionError::InternalError(format!( + return Err(DataFusionError::Internal(format!( "Cannot convert a ScalarValue with associated DataType {:?}", v.get_datatype() ))) @@ -95,7 +95,7 @@ impl From<&GroupByScalar> for ScalarValue { mod tests { use super::*; - use crate::error::{ExecutionError, Result}; + use crate::error::{DataFusionError, Result}; #[test] fn from_scalar_holding_none() -> Result<()> { @@ -103,7 +103,7 @@ mod tests { let result = GroupByScalar::try_from(&scalar_value); match result { - Err(ExecutionError::InternalError(error_message)) => assert_eq!( + Err(DataFusionError::Internal(error_message)) => assert_eq!( error_message, String::from("Cannot convert a ScalarValue holding NULL (Int8(NULL))") ), @@ -120,7 +120,7 @@ mod tests { let result = GroupByScalar::try_from(&scalar_value); match result { - Err(ExecutionError::InternalError(error_message)) => assert_eq!( + Err(DataFusionError::Internal(error_message)) => assert_eq!( error_message, String::from( "Cannot convert a ScalarValue with associated DataType Float32" diff --git a/rust/datafusion/src/physical_plan/hash_aggregate.rs b/rust/datafusion/src/physical_plan/hash_aggregate.rs index 9329fccb277..0476a9bb837 100644 --- a/rust/datafusion/src/physical_plan/hash_aggregate.rs +++ b/rust/datafusion/src/physical_plan/hash_aggregate.rs @@ -24,7 +24,7 @@ use std::task::{Context, Poll}; use futures::stream::{Stream, StreamExt, TryStreamExt}; use futures::FutureExt; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::{Accumulator, AggregateExpr}; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning, PhysicalExpr}; @@ -183,7 +183,7 @@ impl ExecutionPlan for HashAggregateExec { self.aggr_expr.clone(), children[0].clone(), )?)), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "HashAggregateExec wrong number of children".to_string(), )), } @@ -254,13 +254,13 @@ fn group_aggregate_batch( for row in 0..batch.num_rows() { // 1.1 create_key(&group_values, row, &mut key) - .map_err(ExecutionError::into_arrow_external_error)?; + .map_err(DataFusionError::into_arrow_external_error)?; match accumulators.get_mut(&key) { // 1.2 None => { let accumulator_set = create_accumulators(aggr_expr) - .map_err(ExecutionError::into_arrow_external_error)?; + .map_err(DataFusionError::into_arrow_external_error)?; accumulators .insert(key.clone(), (accumulator_set, Box::new(vec![row as u32]))); @@ -362,9 +362,9 @@ impl Stream for GroupedHashAggregateStream { let aggregate_expressions = match aggregate_expressions(&aggr_expr, &mode) { Ok(e) => e, Err(e) => { - return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error( - e, - )))) + return Poll::Ready(Some(Err( + DataFusionError::into_arrow_external_error(e), + ))) } }; @@ -386,7 +386,7 @@ impl Stream for GroupedHashAggregateStream { accumulators, &aggregate_expressions, ) - .map_err(ExecutionError::into_arrow_external_error) + .map_err(DataFusionError::into_arrow_external_error) }, ); @@ -541,18 +541,18 @@ impl Stream for HashAggregateStream { let accumulators = match create_accumulators(&self.aggr_expr) { Ok(e) => e, Err(e) => { - return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error( - e, - )))) + return Poll::Ready(Some(Err( + DataFusionError::into_arrow_external_error(e), + ))) } }; let expressions = match aggregate_expressions(&self.aggr_expr, &self.mode) { Ok(e) => e, Err(e) => { - return Poll::Ready(Some(Err(ExecutionError::into_arrow_external_error( - e, - )))) + return Poll::Ready(Some(Err( + DataFusionError::into_arrow_external_error(e), + ))) } }; let expressions = Arc::new(expressions); @@ -570,7 +570,7 @@ impl Stream for HashAggregateStream { (accumulators, expressions), |(acc, expr), batch| async move { aggregate_batch(&mode, &batch, acc, &expr) - .map_err(ExecutionError::into_arrow_external_error) + .map_err(DataFusionError::into_arrow_external_error) .map(|agg| (agg, expr)) }, ) @@ -581,7 +581,7 @@ impl Stream for HashAggregateStream { maybe_accumulators.map(|accumulators| { // 2. convert values to a record batch finalize_aggregation(&accumulators, &mode) - .map_err(ExecutionError::into_arrow_external_error) + .map_err(DataFusionError::into_arrow_external_error) .and_then(|columns| RecordBatch::try_new(schema.clone(), columns)) })? }); @@ -642,7 +642,7 @@ fn create_batch_from_map( // 3. groups.extend( finalize_aggregation(accumulator_set, mode) - .map_err(ExecutionError::into_arrow_external_error)?, + .map_err(DataFusionError::into_arrow_external_error)?, ); Ok(groups) @@ -745,9 +745,10 @@ fn create_key( vec[i] = GroupByScalar::Utf8(String::from(array.value(row))) } _ => { - return Err(ExecutionError::ExecutionError( + // This is internal because we should have caught this before. + return Err(DataFusionError::Internal( "Unsupported GROUP BY data type".to_string(), - )) + )); } } } diff --git a/rust/datafusion/src/physical_plan/limit.rs b/rust/datafusion/src/physical_plan/limit.rs index 570e4d48801..b685ca43b6a 100644 --- a/rust/datafusion/src/physical_plan/limit.rs +++ b/rust/datafusion/src/physical_plan/limit.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::sync::Arc; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::memory::MemoryStream; use crate::physical_plan::{Distribution, ExecutionPlan, Partitioning}; use arrow::array::ArrayRef; @@ -89,7 +89,7 @@ impl ExecutionPlan for GlobalLimitExec { self.limit, self.concurrency, ))), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "GlobalLimitExec wrong number of children".to_string(), )), } @@ -98,7 +98,7 @@ impl ExecutionPlan for GlobalLimitExec { async fn execute(&self, partition: usize) -> Result { // GlobalLimitExec has a single output partition if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "GlobalLimitExec invalid partition {}", partition ))); @@ -106,7 +106,7 @@ impl ExecutionPlan for GlobalLimitExec { // GlobalLimitExec requires a single input partition if 1 != self.input.output_partitioning().partition_count() { - return Err(ExecutionError::General( + return Err(DataFusionError::Internal( "GlobalLimitExec requires a single input partition".to_owned(), )); } @@ -162,7 +162,7 @@ impl ExecutionPlan for LocalLimitExec { children[0].clone(), self.limit, ))), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "LocalLimitExec wrong number of children".to_string(), )), } @@ -181,7 +181,7 @@ impl ExecutionPlan for LocalLimitExec { /// Truncate a RecordBatch to maximum of n rows pub fn truncate_batch(batch: &RecordBatch, n: usize) -> Result { let limited_columns: Result> = (0..batch.num_columns()) - .map(|i| limit(batch.column(i), n).map_err(|error| ExecutionError::from(error))) + .map(|i| limit(batch.column(i), n).map_err(|error| DataFusionError::from(error))) .collect(); Ok(RecordBatch::try_new( @@ -216,7 +216,7 @@ async fn collect_with_limit( None => { return Ok(results); } - Some(Err(e)) => return Err(ExecutionError::from(e)), + Some(Err(e)) => return Err(DataFusionError::from(e)), } } } diff --git a/rust/datafusion/src/physical_plan/math_expressions.rs b/rust/datafusion/src/physical_plan/math_expressions.rs index 32049d09988..71eda2b0de3 100644 --- a/rust/datafusion/src/physical_plan/math_expressions.rs +++ b/rust/datafusion/src/physical_plan/math_expressions.rs @@ -25,7 +25,7 @@ use arrow::array::{ use arrow::buffer::Buffer; use arrow::datatypes::{DataType, ToByteSlice}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; macro_rules! compute_op { ($ARRAY:expr, $FUNC:ident, $TYPE:ident) => {{ @@ -51,7 +51,7 @@ macro_rules! downcast_compute_op { let n = $ARRAY.as_any().downcast_ref::<$TYPE>(); match n { Some(array) => compute_op!(array, $FUNC, $TYPE), - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::Internal(format!( "Invalid data type for {}", $NAME ))), @@ -64,7 +64,7 @@ macro_rules! unary_primitive_array_op { match ($ARRAY).data_type() { DataType::Float32 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float32Array), DataType::Float64 => downcast_compute_op!($ARRAY, $NAME, $FUNC, Float64Array), - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Unsupported data type {:?} for function {}", other, $NAME, ))), diff --git a/rust/datafusion/src/physical_plan/memory.rs b/rust/datafusion/src/physical_plan/memory.rs index a3fc7337aa7..e7778f9d564 100644 --- a/rust/datafusion/src/physical_plan/memory.rs +++ b/rust/datafusion/src/physical_plan/memory.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use super::{ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; @@ -67,7 +67,7 @@ impl ExecutionPlan for MemoryExec { &self, _: Vec>, ) -> Result> { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Children cannot be replaced in {:?}", self ))) diff --git a/rust/datafusion/src/physical_plan/merge.rs b/rust/datafusion/src/physical_plan/merge.rs index 86285511e4f..f50bfee92ba 100644 --- a/rust/datafusion/src/physical_plan/merge.rs +++ b/rust/datafusion/src/physical_plan/merge.rs @@ -25,7 +25,7 @@ use std::sync::Arc; use futures::future; use super::common; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::Partitioning; @@ -78,7 +78,7 @@ impl ExecutionPlan for MergeExec { ) -> Result> { match children.len() { 1 => Ok(Arc::new(MergeExec::new(children[0].clone()))), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "MergeExec wrong number of children".to_string(), )), } @@ -87,7 +87,7 @@ impl ExecutionPlan for MergeExec { async fn execute(&self, partition: usize) -> Result { // MergeExec produces a single partition if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "MergeExec invalid partition {}", partition ))); @@ -95,7 +95,7 @@ impl ExecutionPlan for MergeExec { let input_partitions = self.input.output_partitioning().partition_count(); match input_partitions { - 0 => Err(ExecutionError::General( + 0 => Err(DataFusionError::Internal( "MergeExec requires at least one input partition".to_owned(), )), 1 => { diff --git a/rust/datafusion/src/physical_plan/parquet.rs b/rust/datafusion/src/physical_plan/parquet.rs index 946f015ed1a..85a2346f306 100644 --- a/rust/datafusion/src/physical_plan/parquet.rs +++ b/rust/datafusion/src/physical_plan/parquet.rs @@ -25,7 +25,7 @@ use std::task::{Context, Poll}; use std::{fmt, thread}; use super::{RecordBatchStream, SendableRecordBatchStream}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::ExecutionPlan; use crate::physical_plan::{common, Partitioning}; use arrow::datatypes::{Schema, SchemaRef}; @@ -63,7 +63,7 @@ impl ParquetExec { let mut filenames: Vec = vec![]; common::build_file_list(path, &mut filenames, ".parquet")?; if filenames.is_empty() { - Err(ExecutionError::General("No files found".to_string())) + Err(DataFusionError::Plan("No files found".to_string())) } else { let file = File::open(&filenames[0])?; let file_reader = Rc::new(SerializedFileReader::new(file)?); @@ -120,7 +120,7 @@ impl ExecutionPlan for ParquetExec { if children.is_empty() { Ok(Arc::new(self.clone())) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Children cannot be replaced in {:?}", self ))) @@ -158,7 +158,7 @@ fn send_result( ) -> Result<()> { response_tx .send(result) - .map_err(|e| ExecutionError::ExecutionError(e.to_string()))?; + .map_err(|e| DataFusionError::Execution(e.to_string()))?; Ok(()) } @@ -190,7 +190,7 @@ fn read_file( Some(Err(ArrowError::ParquetError(err_msg.clone()))), )?; // terminate thread with error - return Err(ExecutionError::ExecutionError(err_msg)); + return Err(DataFusionError::Execution(err_msg)); } } } diff --git a/rust/datafusion/src/physical_plan/planner.rs b/rust/datafusion/src/physical_plan/planner.rs index cfc8858841a..bb88b1f49b5 100644 --- a/rust/datafusion/src/physical_plan/planner.rs +++ b/rust/datafusion/src/physical_plan/planner.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use super::{aggregates, empty::EmptyExec, expressions::binary, functions, udaf}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::execution::context::ExecutionContextState; use crate::logical_plan::{ Expr, LogicalPlan, PlanType, StringifiedPlan, UserDefinedLogicalNode, @@ -141,9 +141,10 @@ impl DefaultPhysicalPlanner { .. } => match ctx_state.datasources.get(table_name) { Some(provider) => provider.scan(projection, batch_size), - _ => Err(ExecutionError::General(format!( - "No table named {}", - table_name + _ => Err(DataFusionError::Plan(format!( + "No table named {}. Existing tables: {:?}", + table_name, + ctx_state.datasources.keys().collect::>(), ))), }, LogicalPlan::InMemoryScan { @@ -269,7 +270,7 @@ impl DefaultPhysicalPlanner { }, ctx_state, ), - _ => Err(ExecutionError::ExecutionError( + _ => Err(DataFusionError::Plan( "Sort only accepts sort expressions".to_string(), )), }) @@ -308,7 +309,7 @@ impl DefaultPhysicalPlanner { // TABLE" -- it must be handled at a higher level (so // that the appropriate table can be registered with // the context) - Err(ExecutionError::General( + Err(DataFusionError::Internal( "Unsupported logical plan: CreateExternalTable".to_string(), )) } @@ -353,7 +354,7 @@ impl DefaultPhysicalPlanner { // declared logical schema to catch and warn about // logic errors when creating user defined plans. if plan.schema() != *node.schema() { - Err(ExecutionError::General(format!( + Err(DataFusionError::Plan(format!( "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \ LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}", node, node.schema(), plan.schema() @@ -390,7 +391,7 @@ impl DefaultPhysicalPlanner { provider.get_value(variable_names.clone())?; Ok(Arc::new(Literal::new(scalar_value))) } - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::Plan(format!( "No system variable provider found" ))), } @@ -401,7 +402,7 @@ impl DefaultPhysicalPlanner { provider.get_value(variable_names.clone())?; Ok(Arc::new(Literal::new(scalar_value))) } - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::Plan(format!( "No user defined variable provider found" ))), } @@ -452,7 +453,7 @@ impl DefaultPhysicalPlanner { input_schema, ) } - other => Err(ExecutionError::NotImplemented(format!( + other => Err(DataFusionError::NotImplemented(format!( "Physical plan does not support logical expression {:?}", other ))), @@ -499,7 +500,7 @@ impl DefaultPhysicalPlanner { udaf::create_aggregate_expr(fun, &args, input_schema, name) } - other => Err(ExecutionError::General(format!( + other => Err(DataFusionError::Internal(format!( "Invalid aggregate expression '{:?}'", other ))), @@ -539,7 +540,7 @@ impl ExtensionPlanner for DefaultExtensionPlanner { _inputs: Vec>, _ctx_state: &ExecutionContextState, ) -> Result> { - Err(ExecutionError::NotImplemented(format!( + Err(DataFusionError::NotImplemented(format!( "DefaultPhysicalPlanner does not know how to plan {:?}. \ Provide a custom ExtensionPlanNodePlanner that does", node @@ -660,12 +661,11 @@ mod tests { for case in cases { let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? .project(vec![case.clone()]); - if let Ok(_) = logical_plan { - return Err(ExecutionError::General(format!( - "Expression {:?} expected to error due to impossible coercion", - case - ))); - }; + let message = format!( + "Expression {:?} expected to error due to impossible coercion", + case + ); + assert!(logical_plan.is_err(), message); } Ok(()) } diff --git a/rust/datafusion/src/physical_plan/projection.rs b/rust/datafusion/src/physical_plan/projection.rs index 895148d8681..e15a264e5c1 100644 --- a/rust/datafusion/src/physical_plan/projection.rs +++ b/rust/datafusion/src/physical_plan/projection.rs @@ -25,7 +25,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::{ExecutionPlan, Partitioning, PhysicalExpr}; use arrow::datatypes::{Field, Schema, SchemaRef}; use arrow::error::Result as ArrowResult; @@ -107,7 +107,7 @@ impl ExecutionPlan for ProjectionExec { self.expr.clone(), children[0].clone(), )?)), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "ProjectionExec wrong number of children".to_string(), )), } @@ -132,7 +132,7 @@ fn batch_project( .map(|expr| expr.evaluate(&batch)) .collect::>>() .map_or_else( - |e| Err(ExecutionError::into_arrow_external_error(e)), + |e| Err(DataFusionError::into_arrow_external_error(e)), |arrays| RecordBatch::try_new(schema.clone(), arrays), ) } diff --git a/rust/datafusion/src/physical_plan/sort.rs b/rust/datafusion/src/physical_plan/sort.rs index 61d8bd85b27..0c3601a0f44 100644 --- a/rust/datafusion/src/physical_plan/sort.rs +++ b/rust/datafusion/src/physical_plan/sort.rs @@ -27,7 +27,7 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use super::SendableRecordBatchStream; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::common::SizedRecordBatchStream; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{common, Distribution, ExecutionPlan, Partitioning}; @@ -94,7 +94,7 @@ impl ExecutionPlan for SortExec { children[0].clone(), self.concurrency, )?)), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "SortExec wrong number of children".to_string(), )), } @@ -102,7 +102,7 @@ impl ExecutionPlan for SortExec { async fn execute(&self, partition: usize) -> Result { if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "SortExec invalid partition {}", partition ))); @@ -110,7 +110,7 @@ impl ExecutionPlan for SortExec { // sort needs to operate on a single partition currently if 1 != self.input.output_partitioning().partition_count() { - return Err(ExecutionError::General( + return Err(DataFusionError::Internal( "SortExec requires a single input partition".to_owned(), )); } diff --git a/rust/datafusion/src/physical_plan/string_expressions.rs b/rust/datafusion/src/physical_plan/string_expressions.rs index 89e06f03bc7..ea70c8db552 100644 --- a/rust/datafusion/src/physical_plan/string_expressions.rs +++ b/rust/datafusion/src/physical_plan/string_expressions.rs @@ -17,7 +17,7 @@ //! String expressions -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use arrow::array::{Array, ArrayRef, StringArray, StringBuilder}; macro_rules! downcast_vec { @@ -26,7 +26,7 @@ macro_rules! downcast_vec { .iter() .map(|e| match e.as_any().downcast_ref::<$ARRAY_TYPE>() { Some(array) => Ok(array), - _ => Err(ExecutionError::General("failed to downcast".to_string())), + _ => Err(DataFusionError::Internal("failed to downcast".to_string())), }) }}; } @@ -37,7 +37,7 @@ pub fn concatenate(args: &[ArrayRef]) -> Result { let args = downcast_vec!(args, StringArray).collect::>>()?; // do not accept 0 arguments. if args.len() == 0 { - return Err(ExecutionError::InternalError( + return Err(DataFusionError::Internal( "Concatenate was called with 0 arguments. It requires at least one." .to_string(), )); diff --git a/rust/datafusion/src/physical_plan/type_coercion.rs b/rust/datafusion/src/physical_plan/type_coercion.rs index bb2d1cdde8d..52226c98a85 100644 --- a/rust/datafusion/src/physical_plan/type_coercion.rs +++ b/rust/datafusion/src/physical_plan/type_coercion.rs @@ -34,7 +34,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Schema}; use super::{functions::Signature, PhysicalExpr}; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; use crate::physical_plan::expressions::cast; /// Returns `expressions` coerced to types compatible with @@ -87,7 +87,7 @@ pub fn data_types( Signature::Exact(valid_types) => vec![valid_types.clone()], Signature::Any(number) => { if current_types.len() != *number { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Plan(format!( "The function expected {} arguments but received {}", number, current_types.len() @@ -108,7 +108,7 @@ pub fn data_types( } // none possible -> Error - Err(ExecutionError::General(format!( + Err(DataFusionError::Plan(format!( "Coercion from {:?} to the signature {:?} failed.", current_types, signature ))) @@ -347,7 +347,7 @@ mod tests { for case in cases { if let Ok(_) = coerce(&case.0, &case.1, &case.2) { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Plan(format!( "Error was expected in {:?}", case ))); diff --git a/rust/datafusion/src/scalar.rs b/rust/datafusion/src/scalar.rs index cbec200d168..6424508f1bb 100644 --- a/rust/datafusion/src/scalar.rs +++ b/rust/datafusion/src/scalar.rs @@ -33,7 +33,7 @@ use arrow::{ datatypes::DataType, }; -use crate::error::{ExecutionError, Result}; +use crate::error::{DataFusionError, Result}; /// Represents a dynamically typed, nullable single value. /// This is the single-valued counter-part of arrow’s `Array`. @@ -203,9 +203,7 @@ impl ScalarValue { DataType::LargeUtf8 => typed_cast!(array, index, LargeStringArray, LargeUtf8), DataType::List(nested_type) => { let list_array = array.as_any().downcast_ref::().ok_or( - ExecutionError::InternalError( - "Failed to downcast ListArray".to_string(), - ), + DataFusionError::Internal("Failed to downcast ListArray".to_string()), )?; let value = match list_array.is_null(index) { true => None, @@ -220,7 +218,7 @@ impl ScalarValue { ScalarValue::List(value, *nested_type.clone()) } other => { - return Err(ExecutionError::NotImplemented(format!( + return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar of array of type \"{:?}\"", other ))) @@ -296,7 +294,7 @@ impl From for ScalarValue { } impl TryFrom<&DataType> for ScalarValue { - type Error = ExecutionError; + type Error = DataFusionError; fn try_from(datatype: &DataType) -> Result { Ok(match datatype { @@ -317,7 +315,7 @@ impl TryFrom<&DataType> for ScalarValue { ScalarValue::List(None, *nested_type.clone()) } _ => { - return Err(ExecutionError::NotImplemented(format!( + return Err(DataFusionError::NotImplemented(format!( "Can't create a scalar of type \"{:?}\"", datatype ))) diff --git a/rust/datafusion/src/sql/planner.rs b/rust/datafusion/src/sql/planner.rs index a9ec8ca15e0..fc138dfbc8e 100644 --- a/rust/datafusion/src/sql/planner.rs +++ b/rust/datafusion/src/sql/planner.rs @@ -26,7 +26,7 @@ use crate::logical_plan::{ }; use crate::scalar::ScalarValue; use crate::{ - error::{ExecutionError, Result}, + error::{DataFusionError, Result}, physical_plan::udaf::AggregateUDF, }; use crate::{ @@ -80,7 +80,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { pub fn sql_statement_to_plan(&self, sql: &Statement) -> Result { match sql { Statement::Query(query) => self.query_to_plan(&query), - _ => Err(ExecutionError::NotImplemented( + _ => Err(DataFusionError::NotImplemented( "Only SELECT statements are implemented".to_string(), )), } @@ -90,7 +90,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { pub fn query_to_plan(&self, query: &Query) -> Result { let plan = match &query.body { SetExpr::Select(s) => self.select_to_plan(s.as_ref()), - _ => Err(ExecutionError::NotImplemented( + _ => Err(DataFusionError::NotImplemented( format!("Query {} not implemented yet", query.body).to_owned(), )), }?; @@ -117,14 +117,14 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { match *file_type { FileType::CSV => { if columns.is_empty() { - return Err(ExecutionError::General( + return Err(DataFusionError::Plan( "Column definitions required for CSV files. None found".into(), )); } } FileType::Parquet => { if !columns.is_empty() { - return Err(ExecutionError::General( + return Err(DataFusionError::Plan( "Column definitions can not be specified for PARQUET files." .into(), )); @@ -200,8 +200,8 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { SQLDataType::Date => Ok(DataType::Date64(DateUnit::Day)), SQLDataType::Time => Ok(DataType::Time64(TimeUnit::Millisecond)), SQLDataType::Timestamp => Ok(DataType::Date64(DateUnit::Millisecond)), - _ => Err(ExecutionError::General(format!( - "Unsupported data type: {:?}.", + _ => Err(DataFusionError::NotImplemented(format!( + "The SQL data type {:?} is not implemented", sql_type ))), } @@ -212,7 +212,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { return Ok(LogicalPlanBuilder::empty().build()?); } if from.len() != 1 { - return Err(ExecutionError::NotImplemented( + return Err(DataFusionError::NotImplemented( "FROM with multiple tables is still not implemented".to_string(), )); }; @@ -228,13 +228,13 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { None, )? .build()?), - None => Err(ExecutionError::General(format!( + None => Err(DataFusionError::Plan(format!( "no schema found for table {}", name ))), } } - _ => Err(ExecutionError::NotImplemented( + _ => Err(DataFusionError::NotImplemented( "Subqueries are still not supported".to_string(), )), } @@ -243,7 +243,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { /// Generate a logic plan from an SQL select fn select_to_plan(&self, select: &Select) -> Result { if select.having.is_some() { - return Err(ExecutionError::NotImplemented( + return Err(DataFusionError::NotImplemented( "HAVING is not implemented yet".to_string(), )); } @@ -310,7 +310,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { let aggr_count = aggr_expr.len(); if group_by_count + aggr_count != projection_expr.len() { - return Err(ExecutionError::General( + return Err(DataFusionError::Plan( "Projection references non-aggregate values".to_owned(), )); } @@ -349,7 +349,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { Some(ref limit_expr) => { let n = match self.sql_to_rex(&limit_expr, &input.schema())? { Expr::Literal(ScalarValue::Int64(Some(n))) => Ok(n as usize), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Plan( "Unexpected expression for LIMIT clause".to_string(), )), }?; @@ -396,7 +396,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { alias.value.clone(), )), SelectItem::Wildcard => Ok(Expr::Wildcard), - SelectItem::QualifiedWildcard(_) => Err(ExecutionError::NotImplemented( + SelectItem::QualifiedWildcard(_) => Err(DataFusionError::NotImplemented( "Qualified wildcards are not supported".to_string(), )), } @@ -418,7 +418,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { } else { match schema.field_with_name(&id.value) { Ok(field) => Ok(Expr::Column(field.name().clone())), - Err(_) => Err(ExecutionError::ExecutionError(format!( + Err(_) => Err(DataFusionError::Plan(format!( "Invalid identifier '{}' for schema {}", id, schema.to_string() @@ -436,7 +436,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { if &var_names[0][0..1] == "@" { Ok(Expr::ScalarVariable(var_names)) } else { - Err(ExecutionError::ExecutionError(format!( + Err(DataFusionError::Plan(format!( "Invalid compound identifier '{:?}' for schema {}", var_names, schema.to_string() @@ -466,7 +466,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { UnaryOperator::Not => { Ok(Expr::Not(Box::new(self.sql_to_rex(expr, schema)?))) } - _ => Err(ExecutionError::InternalError(format!( + _ => Err(DataFusionError::Internal(format!( "SQL binary operator cannot be interpreted as a unary operator" ))), }, @@ -492,7 +492,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { BinaryOperator::Or => Ok(Operator::Or), BinaryOperator::Like => Ok(Operator::Like), BinaryOperator::NotLike => Ok(Operator::NotLike), - _ => Err(ExecutionError::NotImplemented(format!( + _ => Err(DataFusionError::NotImplemented(format!( "Unsupported SQL binary operator {:?}", op ))), @@ -573,7 +573,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { args, }) } - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::Plan(format!( "Invalid function '{}'", name ))), @@ -583,7 +583,7 @@ impl<'a, S: SchemaProvider> SqlToRel<'a, S> { SQLExpr::Nested(e) => self.sql_to_rex(&e, &schema), - _ => Err(ExecutionError::General(format!( + _ => Err(DataFusionError::NotImplemented(format!( "Unsupported ast node {:?} in sqltorel", sql ))), @@ -611,7 +611,7 @@ pub fn convert_data_type(sql: &SQLDataType) -> Result { SQLDataType::Double => Ok(DataType::Float64), SQLDataType::Char(_) | SQLDataType::Varchar(_) => Ok(DataType::Utf8), SQLDataType::Timestamp => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), - other => Err(ExecutionError::NotImplemented(format!( + other => Err(DataFusionError::NotImplemented(format!( "Unsupported SQL type {:?}", other ))), @@ -848,7 +848,7 @@ mod tests { let sql = "SELECT c1, MIN(c12) FROM aggregate_test_100 GROUP BY c1, c13"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "General(\"Projection references non-aggregate values\")", + "Plan(\"Projection references non-aggregate values\")", format!("{:?}", err) ); } @@ -858,7 +858,7 @@ mod tests { let sql = "SELECT c1, c13, MIN(c12) FROM aggregate_test_100 GROUP BY c1"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "General(\"Projection references non-aggregate values\")", + "Plan(\"Projection references non-aggregate values\")", format!("{:?}", err) ); } @@ -875,7 +875,7 @@ mod tests { let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "General(\"Column definitions required for CSV files. None found\")", + "Plan(\"Column definitions required for CSV files. None found\")", format!("{:?}", err) ); } @@ -886,7 +886,7 @@ mod tests { "CREATE EXTERNAL TABLE t(c1 int) STORED AS PARQUET LOCATION 'foo.parquet'"; let err = logical_plan(sql).expect_err("query should have failed"); assert_eq!( - "General(\"Column definitions can not be specified for PARQUET files.\")", + "Plan(\"Column definitions can not be specified for PARQUET files.\")", format!("{:?}", err) ); } @@ -949,7 +949,7 @@ mod tests { fn get_function_meta(&self, name: &str) -> Option> { let f: ScalarFunctionImplementation = - Arc::new(|_| Err(ExecutionError::NotImplemented("".to_string()))); + Arc::new(|_| Err(DataFusionError::NotImplemented("".to_string()))); match name { "my_sqrt" => Some(Arc::new(create_udf( "my_sqrt", diff --git a/rust/datafusion/tests/user_defined_plan.rs b/rust/datafusion/tests/user_defined_plan.rs index 6c50459b725..5536311ecc1 100644 --- a/rust/datafusion/tests/user_defined_plan.rs +++ b/rust/datafusion/tests/user_defined_plan.rs @@ -68,7 +68,7 @@ use arrow::{ util::pretty::pretty_format_batches, }; use datafusion::{ - error::{ExecutionError, Result}, + error::{DataFusionError, Result}, execution::context::ExecutionContextState, execution::context::QueryPlanner, logical_plan::{Expr, LogicalPlan, UserDefinedLogicalNode}, @@ -91,7 +91,7 @@ use async_trait::async_trait; async fn exec_sql(ctx: &mut ExecutionContext, sql: &str) -> Result { let df = ctx.sql(sql)?; let batches = df.collect().await?; - pretty_format_batches(&batches).map_err(|e| ExecutionError::ArrowError(e)) + pretty_format_batches(&batches).map_err(|e| DataFusionError::ArrowError(e)) } /// Create a test table. @@ -335,7 +335,7 @@ impl ExtensionPlanner for TopKPlanner { k: topk_node.k, })) } else { - Err(ExecutionError::General(format!( + Err(DataFusionError::Internal(format!( "Unknown extension node type {:?}", node ))) @@ -389,7 +389,7 @@ impl ExecutionPlan for TopKExec { input: children[0].clone(), k: self.k, })), - _ => Err(ExecutionError::General( + _ => Err(DataFusionError::Internal( "TopKExec wrong number of children".to_string(), )), } @@ -398,7 +398,7 @@ impl ExecutionPlan for TopKExec { /// Execute one partition and return an iterator over RecordBatch async fn execute(&self, partition: usize) -> Result { if 0 != partition { - return Err(ExecutionError::General(format!( + return Err(DataFusionError::Internal(format!( "TopKExec invalid partition {}", partition ))); @@ -504,7 +504,7 @@ impl Stream for TopKReader { BTreeMap::::new(), move |top_values, batch| async move { accumulate_batch(&batch, top_values, &k) - .map_err(ExecutionError::into_arrow_external_error) + .map_err(DataFusionError::into_arrow_external_error) }, );