From 08ec41b71ce1e118fdc9416356e81dc7235ab029 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 11:15:27 +0100 Subject: [PATCH 1/5] Change flight server example to use DataFrame API --- rust/datafusion/examples/flight_server.rs | 21 ++++++++----------- rust/datafusion/src/dataframe.rs | 5 ++++- .../src/execution/dataframe_impl.rs | 9 +++++++- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index 75b470d7087..ba2854fdc5b 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -21,9 +21,9 @@ use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; +use datafusion::datasource::parquet::ParquetTable; use datafusion::datasource::TableProvider; use datafusion::prelude::*; -use datafusion::{datasource::parquet::ParquetTable, physical_plan::collect}; use arrow_flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, @@ -80,7 +80,7 @@ impl FlightService for FlightServiceImpl { request: Request, ) -> Result, Status> { let ticket = request.into_inner(); - match String::from_utf8(ticket.ticket.to_vec()) { + match std::str::from_utf8(&ticket.ticket) { Ok(sql) => { println!("do_get: {}", sql); @@ -96,26 +96,23 @@ impl FlightService for FlightServiceImpl { ) .map_err(|e| to_tonic_err(&e))?; - // create the query plan - let plan = ctx - .create_logical_plan(&sql) - .and_then(|plan| ctx.optimize(&plan)) - .and_then(|plan| ctx.create_physical_plan(&plan)) - .map_err(|e| to_tonic_err(&e))?; + // create the DataFrame + let df = ctx.sql(sql).map_err(|e| to_tonic_err(&e))?; // execute the query - let results = - collect(plan.clone()).await.map_err(|e| to_tonic_err(&e))?; + let results = df.collect().await.map_err(|e| to_tonic_err(&e))?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); } + let physical_plan = + df.to_physical_plan().map_err(|e| to_tonic_err(&e))?; + // add an initial FlightData message that sends schema let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema = plan.schema(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema( - schema.as_ref(), + physical_plan.schema().as_ref(), &options, ); diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index ceb5ca65f5e..13b504b9691 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -17,11 +17,11 @@ //! DataFrame API for building and executing query plans. -use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::logical_plan::{ DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning, }; +use crate::{arrow::record_batch::RecordBatch, physical_plan::ExecutionPlan}; use std::sync::Arc; use async_trait::async_trait; @@ -240,6 +240,9 @@ pub trait DataFrame: Send + Sync { /// Return the logical plan represented by this DataFrame. fn to_logical_plan(&self) -> LogicalPlan; + /// Return the physical plan represented by this DataFrame. + fn to_physical_plan(&self) -> Result>; + /// Return a DataFrame with the explanation of its plan so far. /// /// ``` diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index 3a0931d8ccc..9667b169810 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -19,13 +19,13 @@ use std::sync::{Arc, Mutex}; -use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, }; +use crate::{arrow::record_batch::RecordBatch, physical_plan::ExecutionPlan}; use crate::{ dataframe::*, physical_plan::{collect, collect_partitioned}, @@ -166,6 +166,13 @@ impl DataFrame for DataFrameImpl { let registry = self.ctx_state.lock().unwrap().clone(); Arc::new(registry) } + + fn to_physical_plan(&self) -> Result> { + let state = self.ctx_state.lock().unwrap().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + + ctx.create_physical_plan(&self.to_logical_plan()) + } } #[cfg(test)] From db7a88c0b332986555dbbcc527190fecf98dcbd9 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 11:32:07 +0100 Subject: [PATCH 2/5] Remove dataframe method --- rust/datafusion/examples/flight_server.rs | 5 +++-- rust/datafusion/src/dataframe.rs | 3 --- rust/datafusion/src/execution/dataframe_impl.rs | 7 ------- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index ba2854fdc5b..eb526e415e3 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -105,8 +105,9 @@ impl FlightService for FlightServiceImpl { return Err(Status::internal("There were no results from ticket")); } - let physical_plan = - df.to_physical_plan().map_err(|e| to_tonic_err(&e))?; + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .map_err(|e| to_tonic_err(&e))?; // add an initial FlightData message that sends schema let options = arrow::ipc::writer::IpcWriteOptions::default(); diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index 13b504b9691..410b36a5e86 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -240,9 +240,6 @@ pub trait DataFrame: Send + Sync { /// Return the logical plan represented by this DataFrame. fn to_logical_plan(&self) -> LogicalPlan; - /// Return the physical plan represented by this DataFrame. - fn to_physical_plan(&self) -> Result>; - /// Return a DataFrame with the explanation of its plan so far. /// /// ``` diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index 9667b169810..635bbb41d15 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -166,13 +166,6 @@ impl DataFrame for DataFrameImpl { let registry = self.ctx_state.lock().unwrap().clone(); Arc::new(registry) } - - fn to_physical_plan(&self) -> Result> { - let state = self.ctx_state.lock().unwrap().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - - ctx.create_physical_plan(&self.to_logical_plan()) - } } #[cfg(test)] From b0e65d44326fe5dd2dd342c41654c2b619e7f520 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 11:34:02 +0100 Subject: [PATCH 3/5] Some further cleanup --- rust/datafusion/examples/flight_server.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index eb526e415e3..2df1b8f01cf 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -94,20 +94,20 @@ impl FlightService for FlightServiceImpl { "alltypes_plain", &format!("{}/alltypes_plain.parquet", testdata), ) - .map_err(|e| to_tonic_err(&e))?; + .map_err(to_tonic_err)?; // create the DataFrame - let df = ctx.sql(sql).map_err(|e| to_tonic_err(&e))?; + let df = ctx.sql(sql).map_err(to_tonic_err)?; // execute the query - let results = df.collect().await.map_err(|e| to_tonic_err(&e))?; + let results = df.collect().await.map_err(to_tonic_err)?; if results.is_empty() { return Err(Status::internal("There were no results from ticket")); } let physical_plan = ctx .create_physical_plan(&df.to_logical_plan()) - .map_err(|e| to_tonic_err(&e))?; + .map_err(to_tonic_err)?; // add an initial FlightData message that sends schema let options = arrow::ipc::writer::IpcWriteOptions::default(); @@ -195,7 +195,7 @@ impl FlightService for FlightServiceImpl { } } -fn to_tonic_err(e: &datafusion::error::DataFusionError) -> Status { +fn to_tonic_err(e: datafusion::error::DataFusionError) -> Status { Status::internal(format!("{:?}", e)) } From b86061b0342ff52231550eb2b00951d61a576101 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sat, 6 Mar 2021 11:35:06 +0100 Subject: [PATCH 4/5] Revert --- rust/datafusion/src/dataframe.rs | 2 +- rust/datafusion/src/execution/dataframe_impl.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/src/dataframe.rs b/rust/datafusion/src/dataframe.rs index 410b36a5e86..ceb5ca65f5e 100644 --- a/rust/datafusion/src/dataframe.rs +++ b/rust/datafusion/src/dataframe.rs @@ -17,11 +17,11 @@ //! DataFrame API for building and executing query plans. +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::logical_plan::{ DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning, }; -use crate::{arrow::record_batch::RecordBatch, physical_plan::ExecutionPlan}; use std::sync::Arc; use async_trait::async_trait; diff --git a/rust/datafusion/src/execution/dataframe_impl.rs b/rust/datafusion/src/execution/dataframe_impl.rs index 635bbb41d15..3a0931d8ccc 100644 --- a/rust/datafusion/src/execution/dataframe_impl.rs +++ b/rust/datafusion/src/execution/dataframe_impl.rs @@ -19,13 +19,13 @@ use std::sync::{Arc, Mutex}; +use crate::arrow::record_batch::RecordBatch; use crate::error::Result; use crate::execution::context::{ExecutionContext, ExecutionContextState}; use crate::logical_plan::{ col, DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, }; -use crate::{arrow::record_batch::RecordBatch, physical_plan::ExecutionPlan}; use crate::{ dataframe::*, physical_plan::{collect, collect_partitioned}, From dab0ab26c13652133a0b6a7f2a61b1abebbc9fb5 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Sun, 14 Mar 2021 12:22:33 +0100 Subject: [PATCH 5/5] Simplify example to create schema based on dataframe --- rust/datafusion/examples/flight_server.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rust/datafusion/examples/flight_server.rs b/rust/datafusion/examples/flight_server.rs index 2df1b8f01cf..79660dd1871 100644 --- a/rust/datafusion/examples/flight_server.rs +++ b/rust/datafusion/examples/flight_server.rs @@ -105,15 +105,11 @@ impl FlightService for FlightServiceImpl { return Err(Status::internal("There were no results from ticket")); } - let physical_plan = ctx - .create_physical_plan(&df.to_logical_plan()) - .map_err(to_tonic_err)?; - // add an initial FlightData message that sends schema let options = arrow::ipc::writer::IpcWriteOptions::default(); let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema( - physical_plan.schema().as_ref(), + &df.schema().clone().into(), &options, );