diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 906a1ce415f60..987c4fdb079dd 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -59,11 +59,12 @@ pub struct CsvFile { impl CsvFile { /// Attempt to initialize a new `CsvFile` from a file path - pub fn try_new(path: &str, options: CsvReadOptions) -> Result { + pub fn try_new(path: impl Into, options: CsvReadOptions) -> Result { + let path = path.into(); let schema = Arc::new(match options.schema { Some(s) => s.clone(), None => { - let filenames = common::build_file_list(path, options.file_extension)?; + let filenames = common::build_file_list(&path, options.file_extension)?; if filenames.is_empty() { return Err(DataFusionError::Plan(format!( "No files found at {path} with file extension {file_extension}", @@ -76,7 +77,7 @@ impl CsvFile { }); Ok(Self { - source: Source::Path(path.to_string()), + source: Source::Path(path), schema, has_header: options.has_header, delimiter: options.delimiter, diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index abfb81d99887d..fd147413059ba 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -42,11 +42,12 @@ pub struct ParquetTable { impl ParquetTable { /// Attempt to initialize a new `ParquetTable` from a file path. - pub fn try_new(path: &str, max_concurrency: usize) -> Result { - let parquet_exec = ParquetExec::try_from_path(path, None, None, 0, 1, None)?; + pub fn try_new(path: impl Into, max_concurrency: usize) -> Result { + let path = path.into(); + let parquet_exec = ParquetExec::try_from_path(&path, None, None, 0, 1, None)?; let schema = parquet_exec.schema(); Ok(Self { - path: path.to_string(), + path, schema, statistics: parquet_exec.statistics().to_owned(), max_concurrency, diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 8ce408de86a5b..c196b802699a2 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -270,7 +270,7 @@ impl ExecutionContext { /// Creates a DataFrame for reading a CSV data source. pub fn read_csv( &mut self, - filename: &str, + filename: impl Into, options: CsvReadOptions, ) -> Result> { Ok(Arc::new(DataFrameImpl::new( @@ -280,7 +280,10 @@ impl ExecutionContext { } /// Creates a DataFrame for reading a Parquet data source. - pub fn read_parquet(&mut self, filename: &str) -> Result> { + pub fn read_parquet( + &mut self, + filename: impl Into, + ) -> Result> { Ok(Arc::new(DataFrameImpl::new( self.state.clone(), &LogicalPlanBuilder::scan_parquet( @@ -474,10 +477,11 @@ impl ExecutionContext { pub async fn write_csv( &self, plan: Arc, - path: String, + path: impl AsRef, ) -> Result<()> { + let path = path.as_ref(); // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(&path); + let fs_path = Path::new(path); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; @@ -511,11 +515,12 @@ impl ExecutionContext { pub async fn write_parquet( &self, plan: Arc, - path: String, + path: impl AsRef, writer_properties: Option, ) -> Result<()> { + let path = path.as_ref(); // create directory to contain the Parquet files (one per partition) - let fs_path = Path::new(&path); + let fs_path = Path::new(path); match fs::create_dir(fs_path) { Ok(()) => { let mut tasks = vec![]; diff --git a/datafusion/src/logical_plan/builder.rs b/datafusion/src/logical_plan/builder.rs index 17fe6636439c7..1a53e2185a4bc 100644 --- a/datafusion/src/logical_plan/builder.rs +++ b/datafusion/src/logical_plan/builder.rs @@ -115,19 +115,20 @@ impl LogicalPlanBuilder { /// Scan a CSV data source pub fn scan_csv( - path: &str, + path: impl Into, options: CsvReadOptions, projection: Option>, ) -> Result { - Self::scan_csv_with_name(path, options, projection, path) + let path = path.into(); + Self::scan_csv_with_name(path.clone(), options, projection, path) } /// Scan a CSV data source and register it with a given table name pub fn scan_csv_with_name( - path: &str, + path: impl Into, options: CsvReadOptions, projection: Option>, - table_name: &str, + table_name: impl Into, ) -> Result { let provider = Arc::new(CsvFile::try_new(path, options)?); Self::scan(table_name, provider, projection) @@ -135,19 +136,20 @@ impl LogicalPlanBuilder { /// Scan a Parquet data source pub fn scan_parquet( - path: &str, + path: impl Into, projection: Option>, max_concurrency: usize, ) -> Result { - Self::scan_parquet_with_name(path, projection, max_concurrency, path) + let path = path.into(); + Self::scan_parquet_with_name(path.clone(), projection, max_concurrency, path) } /// Scan a Parquet data source and register it with a given table name pub fn scan_parquet_with_name( - path: &str, + path: impl Into, projection: Option>, max_concurrency: usize, - table_name: &str, + table_name: impl Into, ) -> Result { let provider = Arc::new(ParquetTable::try_new(path, max_concurrency)?); Self::scan(table_name, provider, projection) @@ -166,10 +168,12 @@ impl LogicalPlanBuilder { /// Convert a table provider into a builder with a TableScan pub fn scan( - table_name: &str, + table_name: impl Into, provider: Arc, projection: Option>, ) -> Result { + let table_name = table_name.into(); + if table_name.is_empty() { return Err(DataFusionError::Plan( "table_name cannot be empty".to_string(), @@ -184,17 +188,17 @@ impl LogicalPlanBuilder { DFSchema::new( p.iter() .map(|i| { - DFField::from_qualified(table_name, schema.field(*i).clone()) + DFField::from_qualified(&table_name, schema.field(*i).clone()) }) .collect(), ) }) .unwrap_or_else(|| { - DFSchema::try_from_qualified_schema(table_name, &schema) + DFSchema::try_from_qualified_schema(&table_name, &schema) })?; let table_scan = LogicalPlan::TableScan { - table_name: table_name.to_string(), + table_name, source: provider, projected_schema: Arc::new(projected_schema), projection, diff --git a/datafusion/src/logical_plan/expr.rs b/datafusion/src/logical_plan/expr.rs index d20b1f698238c..1bf3b65d9af00 100644 --- a/datafusion/src/logical_plan/expr.rs +++ b/datafusion/src/logical_plan/expr.rs @@ -44,10 +44,10 @@ pub struct Column { impl Column { /// Create Column from unqualified name. - pub fn from_name(name: String) -> Self { + pub fn from_name(name: impl Into) -> Self { Self { relation: None, - name, + name: name.into(), } } @@ -131,7 +131,7 @@ impl fmt::Display for Column { /// ``` /// # use datafusion::logical_plan::*; /// let expr = col("c1"); -/// assert_eq!(expr, Expr::Column(Column::from_name("c1".to_string()))); +/// assert_eq!(expr, Expr::Column(Column::from_name("c1"))); /// ``` /// /// ## Create the expression `c1 + c2` to add columns "c1" and "c2" together diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index 7b1ff326c3c6e..c1d81fe629345 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -890,8 +890,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? .filter(col("a").lt_eq(lit(1i64)))? .build()?; @@ -933,8 +933,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? // "b" and "c" are not shared by either side: they are only available together after the join .filter(col("c").lt_eq(col("b")))? @@ -972,8 +972,8 @@ mod tests { .join( &right, JoinType::Inner, - vec![Column::from_name("a".to_string())], - vec![Column::from_name("a".to_string())], + vec![Column::from_name("a")], + vec![Column::from_name("a")], )? .filter(col("b").lt_eq(lit(1i64)))? .build()?; diff --git a/datafusion/src/optimizer/projection_push_down.rs b/datafusion/src/optimizer/projection_push_down.rs index 4bf2b6e797f8e..3c8f1ee4ceb58 100644 --- a/datafusion/src/optimizer/projection_push_down.rs +++ b/datafusion/src/optimizer/projection_push_down.rs @@ -241,7 +241,7 @@ fn optimize_plan( { window_expr.iter().try_for_each(|expr| { let name = &expr.name(schema)?; - let column = Column::from_name(name.to_string()); + let column = Column::from_name(name); if required_columns.contains(&column) { new_window_expr.push(expr.clone()); new_required_columns.insert(column); @@ -286,7 +286,7 @@ fn optimize_plan( let mut new_aggr_expr = Vec::new(); aggr_expr.iter().try_for_each(|expr| { let name = &expr.name(schema)?; - let column = Column::from_name(name.to_string()); + let column = Column::from_name(name); if required_columns.contains(&column) { new_aggr_expr.push(expr.clone()); diff --git a/datafusion/src/optimizer/utils.rs b/datafusion/src/optimizer/utils.rs index 394308f5af801..ae3e196c22251 100644 --- a/datafusion/src/optimizer/utils.rs +++ b/datafusion/src/optimizer/utils.rs @@ -516,7 +516,7 @@ mod tests { &mut accum, )?; assert_eq!(1, accum.len()); - assert!(accum.contains(&Column::from_name("a".to_string()))); + assert!(accum.contains(&Column::from_name("a"))); Ok(()) } diff --git a/datafusion/src/physical_plan/aggregates.rs b/datafusion/src/physical_plan/aggregates.rs index 897c78fd46ff6..c297a959639a5 100644 --- a/datafusion/src/physical_plan/aggregates.rs +++ b/datafusion/src/physical_plan/aggregates.rs @@ -110,9 +110,9 @@ pub fn create_aggregate_expr( distinct: bool, args: &[Arc], input_schema: &Schema, - name: String, + name: impl Into, ) -> Result> { - // coerce + let name = name.into(); let arg = coerce(args, input_schema, &signature(fun))?; if arg.is_empty() { return Err(DataFusionError::Plan(format!( diff --git a/datafusion/src/physical_plan/expressions/average.rs b/datafusion/src/physical_plan/expressions/average.rs index 6a6332042188f..2e218191f6683 100644 --- a/datafusion/src/physical_plan/expressions/average.rs +++ b/datafusion/src/physical_plan/expressions/average.rs @@ -64,9 +64,13 @@ pub fn avg_return_type(arg_type: &DataType) -> Result { impl Avg { /// Create a new AVG aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/count.rs b/datafusion/src/physical_plan/expressions/count.rs index 4a3fbe4fa7d3d..30c44f1c03b45 100644 --- a/datafusion/src/physical_plan/expressions/count.rs +++ b/datafusion/src/physical_plan/expressions/count.rs @@ -44,9 +44,13 @@ pub struct Count { impl Count { /// Create a new COUNT aggregate function. - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/min_max.rs b/datafusion/src/physical_plan/expressions/min_max.rs index 680e739cbf292..46e41f46a0e53 100644 --- a/datafusion/src/physical_plan/expressions/min_max.rs +++ b/datafusion/src/physical_plan/expressions/min_max.rs @@ -49,9 +49,13 @@ pub struct Max { impl Max { /// Create a new MAX aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, @@ -352,9 +356,13 @@ pub struct Min { impl Min { /// Create a new MIN aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/expressions/nth_value.rs b/datafusion/src/physical_plan/expressions/nth_value.rs index 577c19b54ade0..b548f912b2236 100644 --- a/datafusion/src/physical_plan/expressions/nth_value.rs +++ b/datafusion/src/physical_plan/expressions/nth_value.rs @@ -45,12 +45,12 @@ pub struct NthValue { impl NthValue { /// Create a new FIRST_VALUE window aggregate function pub fn first_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, ) -> Self { Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::First, @@ -59,12 +59,12 @@ impl NthValue { /// Create a new LAST_VALUE window aggregate function pub fn last_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, ) -> Self { Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::Last, @@ -73,7 +73,7 @@ impl NthValue { /// Create a new NTH_VALUE window aggregate function pub fn nth_value( - name: String, + name: impl Into, expr: Arc, data_type: DataType, n: u32, @@ -83,7 +83,7 @@ impl NthValue { "nth_value expect n to be > 0".to_owned(), )), _ => Ok(Self { - name, + name: name.into(), expr, data_type, kind: NthValueKind::Nth(n), diff --git a/datafusion/src/physical_plan/expressions/row_number.rs b/datafusion/src/physical_plan/expressions/row_number.rs index 0444ee971f40d..6b488cc25af29 100644 --- a/datafusion/src/physical_plan/expressions/row_number.rs +++ b/datafusion/src/physical_plan/expressions/row_number.rs @@ -32,8 +32,8 @@ pub struct RowNumber { impl RowNumber { /// Create a new ROW_NUMBER function - pub fn new(name: String) -> Self { - Self { name } + pub fn new(name: impl Into) -> Self { + Self { name: name.into() } } } diff --git a/datafusion/src/physical_plan/expressions/sum.rs b/datafusion/src/physical_plan/expressions/sum.rs index 7bbbf99fa6598..c3f57e31e0d54 100644 --- a/datafusion/src/physical_plan/expressions/sum.rs +++ b/datafusion/src/physical_plan/expressions/sum.rs @@ -65,9 +65,13 @@ pub fn sum_return_type(arg_type: &DataType) -> Result { impl Sum { /// Create a new SUM aggregate function - pub fn new(expr: Arc, name: String, data_type: DataType) -> Self { + pub fn new( + expr: Arc, + name: impl Into, + data_type: DataType, + ) -> Self { Self { - name, + name: name.into(), expr, data_type, nullable: true, diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index d59004243533e..c3bb9a80136f1 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -1018,11 +1018,12 @@ impl DefaultPhysicalPlanner { pub fn create_window_expr_with_name( &self, e: &Expr, - name: String, + name: impl Into, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, ) -> Result> { + let name = name.into(); match e { Expr::WindowFunction { fun, @@ -1124,7 +1125,7 @@ impl DefaultPhysicalPlanner { pub fn create_aggregate_expr_with_name( &self, e: &Expr, - name: String, + name: impl Into, logical_input_schema: &DFSchema, physical_input_schema: &Schema, ctx_state: &ExecutionContextState, @@ -1263,7 +1264,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? // filter clause needs the type coercion rule applied .filter(col("c7").lt(lit(5_u8)))? .project(vec![col("c1"), col("c2")])? @@ -1308,7 +1309,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .filter(col("c7").lt(col("c12")))? .build()?; @@ -1449,7 +1450,7 @@ mod tests { Expr::Literal(ScalarValue::Boolean(Some(true))), Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))), ]; - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? // filter clause needs the type coercion rule applied .filter(col("c12").lt(lit(0.05)))? .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])? @@ -1476,7 +1477,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; @@ -1499,7 +1500,7 @@ mod tests { let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); - let logical_plan = LogicalPlanBuilder::scan_csv(&path, options, None)? + let logical_plan = LogicalPlanBuilder::scan_csv(path, options, None)? .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; diff --git a/datafusion/src/physical_plan/udaf.rs b/datafusion/src/physical_plan/udaf.rs index f7515d326d0a5..c6d65ad5dd607 100644 --- a/datafusion/src/physical_plan/udaf.rs +++ b/datafusion/src/physical_plan/udaf.rs @@ -105,7 +105,7 @@ pub fn create_aggregate_expr( fun: &AggregateUDF, args: &[Arc], input_schema: &Schema, - name: String, + name: impl Into, ) -> Result> { // coerce let args = coerce(args, input_schema, &fun.signature)?; @@ -119,7 +119,7 @@ pub fn create_aggregate_expr( fun: fun.clone(), args: args.clone(), data_type: (fun.return_type)(&arg_types)?.as_ref().clone(), - name, + name: name.into(), })) }