Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/datasource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,5 @@ pub trait TableProvider: Sync + Send {
#[async_trait]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can probably remove #[async_trait] as well

pub trait TableProviderFactory: Sync + Send {
/// Create a TableProvider with the given url
async fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
fn create(&self, url: &str) -> Result<Arc<dyn TableProvider>>;
}
2 changes: 1 addition & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ impl SessionContext {
cmd.file_type
))
})?;
let table = (*factory).create(cmd.location.as_str()).await?;
let table = (*factory).create(cmd.location.as_str())?;
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
Expand Down
5 changes: 1 addition & 4 deletions datafusion/core/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,7 @@ pub struct TestTableFactory {}

#[async_trait]
impl TableProviderFactory for TestTableFactory {
async fn create(
&self,
url: &str,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
fn create(&self, url: &str) -> datafusion_common::Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
url: url.to_string(),
}))
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ async fn main() -> Result<()> {
?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/examples/plan_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
23 changes: 10 additions & 13 deletions datafusion/proto/src/bytes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,35 +136,32 @@ pub fn logical_plan_to_bytes_with_extension_codec(

/// Deserialize a LogicalPlan from json
#[cfg(feature = "json")]
pub async fn logical_plan_from_json(
json: &str,
ctx: &SessionContext,
) -> Result<LogicalPlan> {
pub fn logical_plan_from_json(json: &str, ctx: &SessionContext) -> Result<LogicalPlan> {
let back: protobuf::LogicalPlanNode = serde_json::from_str(json)
.map_err(|e| DataFusionError::Plan(format!("Error serializing plan: {}", e)))?;
let extension_codec = DefaultExtensionCodec {};
back.try_into_logical_plan(ctx, &extension_codec).await
back.try_into_logical_plan(ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes(
pub fn logical_plan_from_bytes(
bytes: &[u8],
ctx: &SessionContext,
) -> Result<LogicalPlan> {
let extension_codec = DefaultExtensionCodec {};
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec).await
logical_plan_from_bytes_with_extension_codec(bytes, ctx, &extension_codec)
}

/// Deserialize a LogicalPlan from bytes
pub async fn logical_plan_from_bytes_with_extension_codec(
pub fn logical_plan_from_bytes_with_extension_codec(
bytes: &[u8],
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
) -> Result<LogicalPlan> {
let protobuf = protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
DataFusionError::Plan(format!("Error decoding expr as protobuf: {}", e))
})?;
protobuf.try_into_logical_plan(ctx, extension_codec).await
protobuf.try_into_logical_plan(ctx, extension_codec)
}

#[derive(Debug)]
Expand All @@ -189,7 +186,7 @@ impl LogicalExtensionCodec for DefaultExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down Expand Up @@ -243,12 +240,12 @@ mod test {
assert_eq!(actual, expected);
}

#[tokio::test]
#[test]
#[cfg(feature = "json")]
async fn json_to_plan() {
fn json_to_plan() {
let input = r#"{"emptyRelation":{}}"#.to_string();
let ctx = SessionContext::new();
let actual = logical_plan_from_json(&input, &ctx).await.unwrap();
let actual = logical_plan_from_json(&input, &ctx).unwrap();
let result = matches!(actual, LogicalPlan::EmptyRelation(_));
assert!(result, "Should parse empty relation");
}
Expand Down
17 changes: 8 additions & 9 deletions datafusion/proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ mod roundtrip_tests {
let bytes =
logical_plan_to_bytes_with_extension_codec(&topk_plan, &extension_codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)
.await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &extension_codec)?;
assert_eq!(
format!("{:?}", topk_plan),
format!("{:?}", logical_round_trip)
Expand Down Expand Up @@ -172,7 +171,7 @@ mod roundtrip_tests {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
_schema: SchemaRef,
Expand All @@ -189,7 +188,7 @@ mod roundtrip_tests {
.get("testtable")
.expect("Unable to find testtable factory")
.clone();
let provider = (*factory).create(msg.url.as_str()).await?;
let provider = (*factory).create(msg.url.as_str())?;
Ok(provider)
}

Expand Down Expand Up @@ -229,7 +228,7 @@ mod roundtrip_tests {
let scan = ctx.table("t")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes_with_extension_codec(&scan, &codec)?;
let logical_round_trip =
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec).await?;
logical_plan_from_bytes_with_extension_codec(&bytes, &ctx, &codec)?;
assert_eq!(format!("{:?}", scan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -257,7 +256,7 @@ mod roundtrip_tests {
println!("{:?}", plan);

let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));

Ok(())
Expand All @@ -270,7 +269,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.table("t1")?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand All @@ -284,7 +283,7 @@ mod roundtrip_tests {
.await?;
let plan = ctx.sql("SELECT * FROM view_t1").await?.to_logical_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx).await?;
let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?;
assert_eq!(format!("{:?}", plan), format!("{:?}", logical_round_trip));
Ok(())
}
Expand Down Expand Up @@ -431,7 +430,7 @@ mod roundtrip_tests {
}
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand Down
24 changes: 13 additions & 11 deletions datafusion/proto/src/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub trait AsLogicalPlan: Debug + Send + Sync + Clone {
B: BufMut,
Self: Sized;

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait LogicalExtensionCodec: Debug + Send + Sync {
buf: &mut Vec<u8>,
) -> Result<(), DataFusionError>;

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
buf: &[u8],
schema: SchemaRef,
Expand Down Expand Up @@ -170,7 +170,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
))
}

async fn try_decode_table_provider(
fn try_decode_table_provider(
&self,
_buf: &[u8],
_schema: SchemaRef,
Expand All @@ -196,7 +196,7 @@ impl LogicalExtensionCodec for DefaultLogicalExtensionCodec {
macro_rules! into_logical_plan {
($PB:expr, $CTX:expr, $CODEC:expr) => {{
if let Some(field) = $PB.as_ref() {
field.as_ref().try_into_logical_plan($CTX, $CODEC).await
field.as_ref().try_into_logical_plan($CTX, $CODEC)
} else {
Err(proto_error("Missing required field in protobuf"))
}
Expand Down Expand Up @@ -301,7 +301,7 @@ impl AsLogicalPlan for LogicalPlanNode {
})
}

async fn try_into_logical_plan(
fn try_into_logical_plan(
&self,
ctx: &SessionContext,
extension_codec: &dyn LogicalExtensionCodec,
Expand Down Expand Up @@ -487,9 +487,11 @@ impl AsLogicalPlan for LogicalPlanNode {
.iter()
.map(|expr| parse_expr(expr, ctx))
.collect::<Result<Vec<_>, _>>()?;
let provider = extension_codec
.try_decode_table_provider(&scan.custom_table_data, schema, ctx)
.await?;
let provider = extension_codec.try_decode_table_provider(
&scan.custom_table_data,
schema,
ctx,
)?;

LogicalPlanBuilder::scan_with_filters(
&scan.table_name,
Expand Down Expand Up @@ -591,7 +593,7 @@ impl AsLogicalPlan for LogicalPlanNode {
.input.clone().ok_or_else(|| DataFusionError::Internal(String::from(
"Protobuf deserialization error, CreateViewNode has invalid LogicalPlan input.",
)))?
.try_into_logical_plan(ctx, extension_codec).await?;
.try_into_logical_plan(ctx, extension_codec)?;
let definition = if !create_view.definition.is_empty() {
Some(create_view.definition.clone())
} else {
Expand Down Expand Up @@ -716,7 +718,7 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Union(union) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in union.inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
let res = i.try_into_logical_plan(ctx, extension_codec)?;
input_plans.push(res);
}

Expand Down Expand Up @@ -744,7 +746,7 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::Extension(LogicalExtensionNode { node, inputs }) => {
let mut input_plans: Vec<LogicalPlan> = vec![];
for i in inputs.iter() {
let res = i.try_into_logical_plan(ctx, extension_codec).await?;
let res = i.try_into_logical_plan(ctx, extension_codec)?;
input_plans.push(res);
}

Expand Down