From 5dc1c96354c6a443aa0a80338950087daf58999b Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Wed, 23 Jun 2021 18:27:32 -0700 Subject: [PATCH 1/2] use Into as argument type wherever applicable --- datafusion/src/execution/context.rs | 13 ++++++++---- datafusion/src/logical_plan/builder.rs | 20 +++++++++++-------- datafusion/src/logical_plan/expr.rs | 6 +++--- datafusion/src/optimizer/filter_push_down.rs | 12 +++++------ .../src/optimizer/projection_push_down.rs | 4 ++-- datafusion/src/optimizer/utils.rs | 2 +- datafusion/src/physical_plan/aggregates.rs | 4 ++-- .../src/physical_plan/expressions/average.rs | 8 ++++++-- .../src/physical_plan/expressions/count.rs | 8 ++++++-- .../src/physical_plan/expressions/min_max.rs | 16 +++++++++++---- .../physical_plan/expressions/nth_value.rs | 12 +++++------ .../physical_plan/expressions/row_number.rs | 4 ++-- .../src/physical_plan/expressions/sum.rs | 8 ++++++-- datafusion/src/physical_plan/planner.rs | 15 +++++++------- datafusion/src/physical_plan/udaf.rs | 4 ++-- 15 files changed, 83 insertions(+), 53 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 926e2db9450a1..8c543f7a836c4 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -270,7 +270,7 @@ impl ExecutionContext { /// Creates a DataFrame for reading a CSV data source. pub fn read_csv( &mut self, - filename: &str, + filename: impl Into, options: CsvReadOptions, ) -> Result> { Ok(Arc::new(DataFrameImpl::new( @@ -280,7 +280,10 @@ impl ExecutionContext { } /// Creates a DataFrame for reading a Parquet data source. - pub fn read_parquet(&mut self, filename: &str) -> Result> { + pub fn read_parquet( + &mut self, + filename: impl Into, + ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &LogicalPlanBuilder::scan_parquet( @@ -474,8 +477,9 @@ impl ExecutionContext { pub async fn write_csv( &self, plan: Arc, - path: String, + path: impl Into, ) -> Result<()> { + let path = path.into(); // create directory to contain the CSV files (one per partition) let fs_path = Path::new(&path); match fs::create_dir(fs_path) { @@ -511,9 +515,10 @@ impl ExecutionContext { pub async fn write_parquet( &self, plan: Arc, - path: String, + path: impl Into, writer_properties: Option, ) -> Result<()> { + let path = path.into(); // create directory to contain the Parquet files (one per partition) let fs_path = Path::new(&path); match fs::create_dir(fs_path) { diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 4b4ed0fb9d413..c9fd0f32497f3 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -115,21 +115,23 @@ impl LogicalPlanBuilder { /// Scan a CSV data source pub fn scan_csv( - path: &str, + path: impl Into, options: CsvReadOptions, projection: Option>, ) -> Result { - let provider = Arc::new(CsvFile::try_new(path, options)?); + let path = path.into(); + let provider = Arc::new(CsvFile::try_new(&path, options)?); Self::scan(path, provider, projection) } /// Scan a Parquet data source pub fn scan_parquet( - path: &str, + path: impl Into, projection: Option>, max_concurrency: usize, ) -> Result { - let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); + let path = path.into(); + let provider = Arc::new(ParquetTable::try_new(&path, max_concurrency)?); Self::scan(path, provider, projection) } @@ -146,10 +148,12 @@ impl LogicalPlanBuilder { /// Convert a table provider into a builder with a TableScan pub fn scan( - table_name: &str, + table_name: impl Into, provider: Arc, projection: Option>, ) -> Result { + let table_name = table_name.into(); + if table_name.is_empty() { return Err(DataFusionError::Plan( "table_name cannot be empty".to_string(), @@ -164,17 +168,17 @@ impl LogicalPlanBuilder { DFSchema::new( p.iter() .map(|i| { - DFField::from_qualified(table_name, schema.field(*i).clone()) + DFField::from_qualified(&table_name, schema.field(*i).clone()) }) .collect(), ) }) .unwrap_or_else(|| { - DFSchema::try_from_qualified_schema(table_name, &schema) + DFSchema::try_from_qualified_schema(&table_name, &schema) })?; let table_scan = LogicalPlan::TableScan { - table_name: table_name.to_string(), + table_name, source: provider, projected_schema: Arc::new(projected_schema), projection, diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index 1c5cc770c94ff..2b7f5fab5cbc5 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -44,10 +44,10 @@ pub struct Column { impl Column { /// Create Column from unqualified name. - pub fn from_name(name: String) -> Self { + pub fn from_name(name: impl Into) -> Self { Self { relation: None, - name, + name: name.into(), } } @@ -131,7 +131,7 @@ impl fmt::Display for Column { /// ``` /// # use datafusion::logical_plan::*; /// let expr = col("c1"); -/// assert_eq!(expr, Expr::Column(Column::from_name("c1".to_string()))); +/// assert_eq!(expr, Expr::Column(Column::from_name("c1"))); /// ``` /// /// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index e5f8dcfbfffd6..7519e9413193f 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -890,8 +890,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? .filter(col("a").lt_eq(lit(1i64)))? .build()?; @@ -933,8 +933,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? // "b" and "c" are not shared by either side: they are only available together after the join .filter(col("c").lt_eq(col("b")))? @@ -972,8 +972,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? .filter(col("b").lt_eq(lit(1i64)))? .build()?; diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 2544d89d04920..c52158964f5e3 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -236,7 +236,7 @@ fn optimize_plan( { window_expr.iter().try_for_each(|expr| { let name = &expr.name(schema)?; - let column = Column::from_name(name.to_string()); + let column = Column::from_name(name); if required_columns.contains(&column) { new_window_expr.push(expr.clone()); new_required_columns.insert(column); @@ -281,7 +281,7 @@ fn optimize_plan( let mut new_aggr_expr = Vec::new(); aggr_expr.iter().try_for_each(|expr| { let name = &expr.name(schema)?; - let column = Column::from_name(name.to_string()); + let column = Column::from_name(name); if required_columns.contains(&column) { new_aggr_expr.push(expr.clone()); diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 76f44b84657ca..8c1eb05f24bec 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -516,7 +516,7 @@ mod tests { &mut accum, )?; assert_eq!(1, accum.len()); - assert!(accum.contains(&Column::from_name("a".to_string()))); + assert!(accum.contains(&Column::from_name("a"))); Ok(()) } diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 897c78fd46ff6..c297a959639a5 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -110,9 +110,9 @@ pub fn create_aggregate_expr( distinct: bool, args: &[Arc], input_schema: &Schema, - name: String, + name: impl Into, ) -> Result> { - // coerce + let name = name.into(); let arg = coerce(args, input_schema, &signature(fun))?; if arg.is_empty() { return Err(DataFusionError::Plan(format!( diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 6a6332042188f..2e218191f6683 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -64,9 +64,13 @@ pub fn avg_return_type(arg_type: &DataType) -> Result { impl Avg { /// Create a new AVG aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 4a3fbe4fa7d3d..30c44f1c03b45 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -44,9 +44,13 @@ pub struct Count { impl Count { /// Create a new COUNT aggregate function. - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 680e739cbf292..46e41f46a0e53 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -49,9 +49,13 @@ pub struct Max { impl Max { /// Create a new MAX aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, @@ -352,9 +356,13 @@ pub struct Min { impl Min { /// Create a new MIN aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 577c19b54ade0..b548f912b2236 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -45,12 +45,12 @@ pub struct NthValue { impl NthValue { /// Create a new FIRST_VALUE window aggregate function pub fn first_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, ) -> Self { Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::First, @@ -59,12 +59,12 @@ impl NthValue { /// Create a new LAST_VALUE window aggregate function pub fn last_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, ) -> Self { Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::Last, @@ -73,7 +73,7 @@ impl NthValue { /// Create a new NTH_VALUE window aggregate function pub fn nth_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, n: u32, @@ -83,7 +83,7 @@ impl NthValue { "nth_value expect n to be > 0".to_owned(), )), _ => Ok(Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::Nth(n), diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 0444ee971f40d..6b488cc25af29 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -32,8 +32,8 @@ pub struct RowNumber { impl RowNumber { /// Create a new ROW_NUMBER function - pub fn new(name: String) -> Self { - Self { name } + pub fn new(name: impl Into) -> Self { + Self { name: name.into() } } } diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index 7bbbf99fa6598..c3f57e31e0d54 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -65,9 +65,13 @@ pub fn sum_return_type(arg_type: &DataType) -> Result { impl Sum { /// Create a new SUM aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index a4c20a7f60ebc..2518c540419b4 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -972,11 +972,12 @@ impl DefaultPhysicalPlanner { pub fn create_window_expr_with_name( &self, e: &Expr, - name: String, + name: impl Into, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result> { + let name = name.into(); match e { Expr::WindowFunction { fun, @@ -1078,7 +1079,7 @@ impl DefaultPhysicalPlanner { pub fn create_aggregate_expr_with_name( &self, e: &Expr, - name: String, + name: impl Into, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, @@ -1217,7 +1218,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? // filter clause needs the type coercion rule applied .filter(col("c7").lt(lit(5_u8)))? .project(vec![col("c1"), col("c2")])? @@ -1262,7 +1263,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .filter(col("c7").lt(col("c12")))? .build()?; @@ -1403,7 +1404,7 @@ mod tests { Expr::Literal(ScalarValue::Boolean(Some(true))), Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))), ]; - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? // filter clause needs the type coercion rule applied .filter(col("c12").lt(lit(0.05)))? .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])? @@ -1430,7 +1431,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; @@ -1453,7 +1454,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs index f7515d326d0a5..c6d65ad5dd607 100644 --- a/datafusion/src/physical_plan/udaf.rs +++ b/datafusion/src/physical_plan/udaf.rs @@ -105,7 +105,7 @@ pub fn create_aggregate_expr( fun: &AggregateUDF, args: &[Arc], input_schema: &Schema, - name: String, + name: impl Into, ) -> Result> { // coerce let args = coerce(args, input_schema, &fun.signature)?; @@ -119,7 +119,7 @@ pub fn create_aggregate_expr( fun: fun.clone(), args: args.clone(), data_type: (fun.return_type)(&arg_types)?.as_ref().clone(), - name, + name: name.into(), })) } From 004bc9d39100bfe7a74201d6b1b0c34cc8281d3c Mon Sep 17 00:00:00 2001 From: Qingping Hou Date: Thu, 24 Jun 2021 22:48:21 -0700 Subject: [PATCH 2/2] switch from Into to AsRef for write_csv and write_parquet --- datafusion/src/execution/context.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8c543f7a836c4..d2be17c65d092 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -477,11 +477,11 @@ impl ExecutionContext { pub async fn write_csv( &self, plan: Arc, - path: impl Into, + path: impl AsRef, ) -> Result<()> { - let path = path.into(); + let path = path.as_ref(); // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(&path); + let fs_path = Path::new(path); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -515,12 +515,12 @@ impl ExecutionContext { pub async fn write_parquet( &self, plan: Arc, - path: impl Into, + path: impl AsRef, writer_properties: Option, ) -> Result<()> { - let path = path.into(); + let path = path.as_ref(); // create directory to contain the Parquet files (one per partition) - let fs_path = Path::new(&path); + let fs_path = Path::new(path); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![];