diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs index 38e3193c4935..c52e4ad4d92c 100644 --- a/datafusion/core/src/datasource/datasource.rs +++ b/datafusion/core/src/datasource/datasource.rs @@ -86,5 +86,5 @@ pub trait TableProvider: Sync + Send { #[async_trait] pub trait TableProviderFactory: Sync + Send { /// Create a TableProvider with the given url - async fn create(&self, url: &str) -> Result>; + fn create(&self, url: &str) -> Result>; } diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c66199e073b7..617933551156 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -429,7 +429,7 @@ impl SessionContext { cmd.file_type )) })?; - let table = (*factory).create(cmd.location.as_str()).await?; + let table = (*factory).create(cmd.location.as_str())?; self.register_table(cmd.name.as_str(), table)?; let plan = LogicalPlanBuilder::empty(false).build()?; Ok(Arc::new(DataFrame::new(self.state.clone(), &plan))) diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs index 769ab47feb69..79d0ff36abc1 100644 --- a/datafusion/core/src/test_util.rs +++ b/datafusion/core/src/test_util.rs @@ -328,10 +328,7 @@ pub struct TestTableFactory {} #[async_trait] impl TableProviderFactory for TestTableFactory { - async fn create( - &self, - url: &str, - ) -> datafusion_common::Result> { + fn create(&self, url: &str) -> datafusion_common::Result> { Ok(Arc::new(TestTableProvider { url: url.to_string(), })) diff --git a/datafusion/proto/README.md b/datafusion/proto/README.md index 8c8962e506a6..a3878447e042 100644 --- a/datafusion/proto/README.md +++ b/datafusion/proto/README.md @@ -63,7 +63,7 @@ async fn main() -> Result<()> { ?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/examples/plan_serde.rs b/datafusion/proto/examples/plan_serde.rs index eed372476fff..d98d88b2a46a 100644 --- a/datafusion/proto/examples/plan_serde.rs +++ b/datafusion/proto/examples/plan_serde.rs @@ -26,7 +26,7 @@ async fn main() -> Result<()> { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } diff --git a/datafusion/proto/src/bytes/mod.rs b/datafusion/proto/src/bytes/mod.rs index 5695bf50686a..475825cc1eb0 100644 --- a/datafusion/proto/src/bytes/mod.rs +++ b/datafusion/proto/src/bytes/mod.rs @@ -136,27 +136,24 @@ pub fn logical_plan_to_bytes_with_extension_codec( /// Deserialize a LogicalPlan from json #[cfg(feature = "json")] -pub async fn logical_plan_from_json( - json: &str, - ctx: &SessionContext, -) -> Result { +pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result { let back: protobuf::LogicalPlanNode = serde_json::from_str(json) .map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?; let extension_codec = DefaultExtensionCodec {}; - back.try_into_logical_plan(ctx, &extension_codec).await + back.try_into_logical_plan(ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes( +pub fn logical_plan_from_bytes( bytes: &[u8], ctx: &SessionContext, ) -> Result { let extension_codec = DefaultExtensionCodec {}; - logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await + logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec) } /// Deserialize a LogicalPlan from bytes -pub async fn logical_plan_from_bytes_with_extension_codec( +pub fn logical_plan_from_bytes_with_extension_codec( bytes: &[u8], ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -164,7 +161,7 @@ pub async fn logical_plan_from_bytes_with_extension_codec( let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| { DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e)) })?; - protobuf.try_into_logical_plan(ctx, extension_codec).await + protobuf.try_into_logical_plan(ctx, extension_codec) } #[derive(Debug)] @@ -189,7 +186,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -243,12 +240,12 @@ mod test { assert_eq!(actual, expected); } - #[tokio::test] + #[test] #[cfg(feature = "json")] - async fn json_to_plan() { + fn json_to_plan() { let input = r#"{"emptyRelation":{}}"#.to_string(); let ctx = SessionContext::new(); - let actual = logical_plan_from_json(&input, &ctx).await.unwrap(); + let actual = logical_plan_from_json(&input, &ctx).unwrap(); let result = matches!(actual, LogicalPlan::EmptyRelation(_)); assert!(result, "Should parse empty relation"); } diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 21ded84a57ca..b3b30106ab42 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -130,8 +130,7 @@ mod roundtrip_tests { let bytes = logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec) - .await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?; assert_eq!( format!("{:?}", topk_plan), format!("{:?}", logical_round_trip) @@ -172,7 +171,7 @@ mod roundtrip_tests { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], _schema: SchemaRef, @@ -189,7 +188,7 @@ mod roundtrip_tests { .get("testtable") .expect("Unable to find testtable factory") .clone(); - let provider = (*factory).create(msg.url.as_str()).await?; + let provider = (*factory).create(msg.url.as_str())?; Ok(provider) } @@ -229,7 +228,7 @@ mod roundtrip_tests { let scan = ctx.table("t")?.to_logical_plan()?; let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?; let logical_round_trip = - logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?; + logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?; assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -257,7 +256,7 @@ mod roundtrip_tests { println!("{:?}", plan); let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) @@ -270,7 +269,7 @@ mod roundtrip_tests { .await?; let plan = ctx.table("t1")?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -284,7 +283,7 @@ mod roundtrip_tests { .await?; let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?; let bytes = logical_plan_to_bytes(&plan)?; - let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip)); Ok(()) } @@ -431,7 +430,7 @@ mod roundtrip_tests { } } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index b9f34ff02265..48833688c3dd 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -86,7 +86,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone { B: BufMut, Self: Sized; - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -130,7 +130,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync { buf: &mut Vec, ) -> Result<(), DataFusionError>; - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, buf: &[u8], schema: SchemaRef, @@ -170,7 +170,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { )) } - async fn try_decode_table_provider( + fn try_decode_table_provider( &self, _buf: &[u8], _schema: SchemaRef, @@ -196,7 +196,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec { macro_rules! into_logical_plan { ($PB:expr, $CTX:expr, $CODEC:expr) => {{ if let Some(field) = $PB.as_ref() { - field.as_ref().try_into_logical_plan($CTX, $CODEC).await + field.as_ref().try_into_logical_plan($CTX, $CODEC) } else { Err(proto_error("Missing required field in protobuf")) } @@ -301,7 +301,7 @@ impl AsLogicalPlan for LogicalPlanNode { }) } - async fn try_into_logical_plan( + fn try_into_logical_plan( &self, ctx: &SessionContext, extension_codec: &dyn LogicalExtensionCodec, @@ -487,9 +487,11 @@ impl AsLogicalPlan for LogicalPlanNode { .iter() .map(|expr| parse_expr(expr, ctx)) .collect::, _>>()?; - let provider = extension_codec - .try_decode_table_provider(&scan.custom_table_data, schema, ctx) - .await?; + let provider = extension_codec.try_decode_table_provider( + &scan.custom_table_data, + schema, + ctx, + )?; LogicalPlanBuilder::scan_with_filters( &scan.table_name, @@ -591,7 +593,7 @@ impl AsLogicalPlan for LogicalPlanNode { .input.clone().ok_or_else(|| DataFusionError::Internal(String::from( "Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.", )))? - .try_into_logical_plan(ctx, extension_codec).await?; + .try_into_logical_plan(ctx, extension_codec)?; let definition = if !create_view.definition.is_empty() { Some(create_view.definition.clone()) } else { @@ -716,7 +718,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::Union(union) => { let mut input_plans: Vec = vec![]; for i in union.inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; + let res = i.try_into_logical_plan(ctx, extension_codec)?; input_plans.push(res); } @@ -744,7 +746,7 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => { let mut input_plans: Vec = vec![]; for i in inputs.iter() { - let res = i.try_into_logical_plan(ctx, extension_codec).await?; + let res = i.try_into_logical_plan(ctx, extension_codec)?; input_plans.push(res); }