From 6beb4eafbdfa11915dbb6395c0ecefbddaefa2b3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 28 Jan 2020 17:21:31 -0700 Subject: [PATCH 01/16] WIP example Flight server for DataFusion --- rust/datafusion/Cargo.toml | 6 + rust/datafusion/examples/flight-server.rs | 159 ++++++++++++++++++++++ 2 files changed, 165 insertions(+) create mode 100644 rust/datafusion/examples/flight-server.rs diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 14fc4e54548..f9f9f5d717f 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -56,6 +56,12 @@ crossbeam = "0.7.1" [dev-dependencies] criterion = "0.2.0" tempdir = "0.3.7" +futures = "0.3" +prost = "0.5" +tokio = { version = "0.2", features = ["full"] } +tonic = "0.1.1" +flatbuffers = "0.6.0" +arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" } [[bench]] name = "aggregate_query_sql" diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs new file mode 100644 index 00000000000..f68a8311eb8 --- /dev/null +++ b/rust/datafusion/examples/flight-server.rs @@ -0,0 +1,159 @@ +use std::pin::Pin; + +use futures::Stream; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use datafusion::execution::context::ExecutionContext; + +use arrow::record_batch::RecordBatch; +use flight::{ + flight_service_server::FlightService, flight_service_server::FlightServiceServer, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, +}; + +#[derive(Clone)] +pub struct FlightServiceImpl {} + +#[tonic::async_trait] +impl FlightService for FlightServiceImpl { + type HandshakeStream = Pin< + Box> + Send + Sync + 'static>, + >; + type ListFlightsStream = + Pin> + Send + Sync + 'static>>; + type DoGetStream = + Pin> + Send + Sync + 'static>>; + type DoPutStream = + Pin> + Send + Sync + 'static>>; + type DoActionStream = Pin< + Box> + Send + Sync + 'static>, + >; + type ListActionsStream = + Pin> + Send + Sync + 'static>>; + + async fn do_get( + &self, + request: Request, + ) -> Result, Status> { + let ticket = request.into_inner(); + match String::from_utf8(ticket.ticket.to_vec()) { + Ok(sql) => { + println!("do_get: {}", sql); + + // create local execution context + let mut ctx = ExecutionContext::new(); + + ctx.register_parquet("alltypes_plain", "alltypes_plain.snappy.parquet") + .unwrap(); + + // create the query plan + let plan = ctx + .create_logical_plan(&sql) + .map_err(|e| to_tonic_err(&e))?; + let plan = ctx.optimize(&plan).map_err(|e| to_tonic_err(&e))?; + let plan = ctx + .create_physical_plan(&plan, 1024 * 1024) + .map_err(|e| to_tonic_err(&e))?; + + //TODO make this async + + // execute the query + let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; + + let flights: Vec> = + results.iter().map(|batch| to_flight_data(batch)).collect(); + + let output = futures::stream::iter(flights); + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + Err(e) => Err(Status::unimplemented(format!("Invalid ticket: {:?}", e))), + } + } + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn get_schema( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } +} + +fn to_flight_data(_batch: &RecordBatch) -> Result { + //TODO implement .. need help on how to encode the batches using IPC here + + Ok(FlightData { + flight_descriptor: None, + app_metadata: vec![], + /// Header for message data as described in Message.fbs::Message + data_header: vec![], + /// The actual batch of Arrow data. Preferably handled with minimal-copies + /// coming last in the definition to help with sidecar patterns (it is + /// expected that some implementations will fetch this field off the wire + /// with specialized code to avoid extra memory copies). + /// + data_body: vec![], + }) +} + +fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { + Status::unimplemented(format!("{:?}", e)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "0.0.0.0:50051".parse()?; + let service = FlightServiceImpl {}; + + let svc = FlightServiceServer::new(service); + + println!("Listening on {:?}", addr); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +} From 516b66dc83c353e5026e2c91ba9098419fe48797 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 29 Jan 2020 08:17:14 +0200 Subject: [PATCH 02/16] add helpers to convert record batch to flight data proto message --- rust/arrow/Cargo.toml | 4 ++- rust/arrow/src/flight/mod.rs | 38 +++++++++++++++++++++++ rust/arrow/src/ipc/writer.rs | 26 ++++++++++------ rust/arrow/src/lib.rs | 2 ++ rust/datafusion/Cargo.toml | 4 +-- rust/datafusion/examples/flight-server.rs | 28 ++++------------- 6 files changed, 67 insertions(+), 35 deletions(-) create mode 100644 rust/arrow/src/flight/mod.rs diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 29829610889..57fc261985c 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true } chrono = "0.4" flatbuffers = "0.6.0" hex = "0.4" +arrow-flight = { path = "../arrow-flight", optional = true } [features] simd = ["packed_simd"] -default = ["simd"] +flight = ["arrow-flight"] +default = ["simd", "flight"] [dev-dependencies] criterion = "0.2" diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs new file mode 100644 index 00000000000..a421e6edef5 --- /dev/null +++ b/rust/arrow/src/flight/mod.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities to assist with reading and writing Arrow data as Flight messages + +use flight::FlightData; + +use crate::ipc::writer; +use crate::record_batch::RecordBatch; + +/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes +impl From<&RecordBatch> for FlightData { + fn from(batch: &RecordBatch) -> Self { + let (header, body) = writer::record_batch_to_bytes(batch); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: header, + data_body: body, + } + } +} + +// TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 36c89c7a666..e79a344bbcf 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -266,13 +266,8 @@ fn write_padded_data( Ok(total_len as usize) } -/// Write a record batch to the writer, writing the message size before the message -/// if the record batch is being written to a stream -fn write_record_batch( - writer: &mut BufWriter, - batch: &RecordBatch, - is_stream: bool, -) -> Result<(usize, usize)> { +/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data +pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -313,13 +308,24 @@ fn write_record_batch( let root = message.finish(); fbb.finish(root, None); let finished_data = fbb.finished_data(); + + (finished_data.to_vec(), arrow_data) +} + +/// Write a record batch to the writer, writing the message size before the message +/// if the record batch is being written to a stream +fn write_record_batch( + writer: &mut BufWriter, + batch: &RecordBatch, + is_stream: bool, +) -> Result<(usize, usize)> { + let (meta_data, arrow_data) = record_batch_to_bytes(batch); // write the length of data if writing to stream if is_stream { - let total_len: u32 = finished_data.len() as u32; + let total_len: u32 = meta_data.len() as u32; writer.write(&total_len.to_le_bytes()[..])?; } - let meta_written = - write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?; + let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?; let arrow_data_written = write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?; Ok((meta_written, arrow_data_written)) diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 899bb62f08a..4383922be09 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -33,6 +33,8 @@ pub mod compute; pub mod csv; pub mod datatypes; pub mod error; +#[cfg(feature = "flight")] +pub mod flight; pub mod ipc; pub mod json; pub mod memory; diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index f9f9f5d717f..3a81877e1ba 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -57,9 +57,9 @@ crossbeam = "0.7.1" criterion = "0.2.0" tempdir = "0.3.7" futures = "0.3" -prost = "0.5" +prost = "0.6" tokio = { version = "0.2", features = ["full"] } -tonic = "0.1.1" +tonic = "0.1" flatbuffers = "0.6.0" arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" } diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index f68a8311eb8..5431ff09e7c 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -6,7 +6,6 @@ use tonic::{Request, Response, Status, Streaming}; use datafusion::execution::context::ExecutionContext; -use arrow::record_batch::RecordBatch; use flight::{ flight_service_server::FlightService, flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, @@ -62,14 +61,16 @@ impl FlightService for FlightServiceImpl { // execute the query let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; - let flights: Vec> = - results.iter().map(|batch| to_flight_data(batch)).collect(); + let flights: Vec> = results + .iter() + .map(|batch| Ok(FlightData::from(batch))) + .collect(); let output = futures::stream::iter(flights); Ok(Response::new(Box::pin(output) as Self::DoGetStream)) } - Err(e) => Err(Status::unimplemented(format!("Invalid ticket: {:?}", e))), + Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))), } } @@ -123,25 +124,8 @@ impl FlightService for FlightServiceImpl { } } -fn to_flight_data(_batch: &RecordBatch) -> Result { - //TODO implement .. need help on how to encode the batches using IPC here - - Ok(FlightData { - flight_descriptor: None, - app_metadata: vec![], - /// Header for message data as described in Message.fbs::Message - data_header: vec![], - /// The actual batch of Arrow data. Preferably handled with minimal-copies - /// coming last in the definition to help with sidecar patterns (it is - /// expected that some implementations will fetch this field off the wire - /// with specialized code to avoid extra memory copies). - /// - data_body: vec![], - }) -} - fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { - Status::unimplemented(format!("{:?}", e)) + Status::internal(format!("{:?}", e)) } #[tokio::main] From 260f9cac151e30e9004ce490aa88d3227dc4a897 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 29 Jan 2020 09:26:54 +0200 Subject: [PATCH 03/16] fix license violation --- rust/datafusion/Cargo.toml | 2 +- rust/datafusion/examples/flight-server.rs | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 3a81877e1ba..f489d2b0d6d 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -58,7 +58,7 @@ criterion = "0.2.0" tempdir = "0.3.7" futures = "0.3" prost = "0.6" -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.2", features = ["macros"] } tonic = "0.1" flatbuffers = "0.6.0" arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" } diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 5431ff09e7c..84f88b06627 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use std::pin::Pin; use futures::Stream; From 996f2a4a28e1f289ecc7fff40e697986417ee8a5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2020 06:05:48 -0700 Subject: [PATCH 04/16] Use PARQUET_TEST_DATA env var --- rust/datafusion/examples/flight-server.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 84f88b06627..880245077a0 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -61,8 +61,15 @@ impl FlightService for FlightServiceImpl { // create local execution context let mut ctx = ExecutionContext::new(); - ctx.register_parquet("alltypes_plain", "alltypes_plain.snappy.parquet") - .unwrap(); + let testdata = std::env::var("PARQUET_TEST_DATA") + .expect("PARQUET_TEST_DATA not defined"); + + // register parquet file with the execution context + ctx.register_parquet( + "alltypes_plain", + &format!("{}/alltypes_plain.parquet", testdata), + ) + .map_err(|e| to_tonic_err(&e))?; // create the query plan let plan = ctx From ad2e3b066dcdedd5dcf45179cf552f3be286614c Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Wed, 29 Jan 2020 17:32:24 +0200 Subject: [PATCH 05/16] send schema before batches --- rust/arrow/src/flight/mod.rs | 16 +++++++++++++++- rust/arrow/src/ipc/writer.rs | 11 +++++++---- rust/datafusion/examples/flight-server.rs | 14 +++++++++++++- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs index a421e6edef5..90061df7269 100644 --- a/rust/arrow/src/flight/mod.rs +++ b/rust/arrow/src/flight/mod.rs @@ -19,10 +19,11 @@ use flight::FlightData; +use crate::datatypes::Schema; use crate::ipc::writer; use crate::record_batch::RecordBatch; -/// Convert a `RecordBatch` to `FlightData by getting the header and body as bytes +/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes impl From<&RecordBatch> for FlightData { fn from(batch: &RecordBatch) -> Self { let (header, body) = writer::record_batch_to_bytes(batch); @@ -35,4 +36,17 @@ impl From<&RecordBatch> for FlightData { } } +/// Convert a `Schema` to `FlightData` by converting to an IPC message +impl From<&Schema> for FlightData { + fn from(schema: &Schema) -> Self { + let schema = writer::schema_to_bytes(schema); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: schema, + data_body: vec![], + } + } +} + // TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index e79a344bbcf..c872c8286d7 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -209,8 +209,7 @@ impl Drop for StreamWriter { } } -/// Convert the schema to its IPC representation, and write it to the `writer` -fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { +pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec { let mut fbb = FlatBufferBuilder::new(); let schema = { let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); @@ -227,9 +226,13 @@ fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result< fbb.finish(data, None); let data = fbb.finished_data(); - let written = write_padded_data(writer, data, WriteDataType::Header); + data.to_vec() +} - written +/// Convert the schema to its IPC representation, and write it to the `writer` +fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { + let data = schema_to_bytes(schema); + write_padded_data(writer, &data[..], WriteDataType::Header) } /// The message type being written. This determines whether to write the data length or not. diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 84f88b06627..6f000d6e5b0 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -77,12 +77,24 @@ impl FlightService for FlightServiceImpl { // execute the query let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; + if results.is_empty() { + return Err(Status::internal("There were no results from ticket")); + } - let flights: Vec> = results + // add an initial FlightData message that sends schema + // TODO: find a more ergonomic way of doing this + let schema = results[0].schema(); + let mut flights: Vec> = + vec![Ok(FlightData::from(schema.as_ref()))]; + + let mut batches: Vec> = results .iter() .map(|batch| Ok(FlightData::from(batch))) .collect(); + // append batch vector to schema vector, so that the first message sent is the schema + flights.append(&mut batches); + let output = futures::stream::iter(flights); Ok(Response::new(Box::pin(output) as Self::DoGetStream)) From ab28da82e43f15a07f13f745701f9f1eadef56b8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 29 Jan 2020 08:53:54 -0700 Subject: [PATCH 06/16] get schema from query plan instead of from first batch --- rust/datafusion/examples/flight-server.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 74214ed5876..05e440f69e0 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -89,8 +89,7 @@ impl FlightService for FlightServiceImpl { } // add an initial FlightData message that sends schema - // TODO: find a more ergonomic way of doing this - let schema = results[0].schema(); + let schema = plan.schema(); let mut flights: Vec> = vec![Ok(FlightData::from(schema.as_ref()))]; From e72c605456440d5072a00f59195374d90acadf08 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:06:40 -0700 Subject: [PATCH 07/16] add starting point for flight-client example --- rust/datafusion/examples/flight-client.rs | 44 +++++++++++++++++++++++ rust/datafusion/examples/flight-server.rs | 2 ++ 2 files changed, 46 insertions(+) create mode 100644 rust/datafusion/examples/flight-client.rs diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs new file mode 100644 index 00000000000..80fd7f4fbf8 --- /dev/null +++ b/rust/datafusion/examples/flight-client.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::pin::Pin; + +use futures::Stream; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use flight::flight_service_client::FlightServiceClient; + +use flight::{ + FlightData, FlightDescriptor, FlightInfo, Ticket, +}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + + let mut client = FlightServiceClient::connect("http://localhost:50051").await?; + + let request = tonic::Request::new(Ticket { + ticket: "SELECT id FROM alltypes_plain".into(), + }); + + let response = client.do_get(request).await?; + + println!("RESPONSE={:?}", response); + + Ok(()) +} diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 880245077a0..c653a3d8ec4 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -90,6 +90,8 @@ impl FlightService for FlightServiceImpl { .map(|batch| Ok(FlightData::from(batch))) .collect(); + println!("Returning {} flights", flights.len()); + let output = futures::stream::iter(flights); Ok(Response::new(Box::pin(output) as Self::DoGetStream)) From 03d2c84376cef389894932700e9200a901c65ae6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:11:01 -0700 Subject: [PATCH 08/16] client streams results --- rust/datafusion/examples/flight-client.rs | 10 +--------- rust/datafusion/examples/flight-server.rs | 4 ---- 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs index bc02ee5a905..2bfb889391c 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight-client.rs @@ -15,17 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::pin::Pin; - -use futures::Stream; -use tonic::transport::Server; -use tonic::{Request, Response, Status, Streaming}; - use flight::flight_service_client::FlightServiceClient; -use flight::{ - FlightData, FlightDescriptor, FlightInfo, Ticket, -}; +use flight::Ticket; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 34ac66677fa..05e440f69e0 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -98,12 +98,8 @@ impl FlightService for FlightServiceImpl { .map(|batch| Ok(FlightData::from(batch))) .collect(); -<<<<<<< HEAD - println!("Returning {} flights", flights.len()); -======= // append batch vector to schema vector, so that the first message sent is the schema flights.append(&mut batches); ->>>>>>> ab28da82e43f15a07f13f745701f9f1eadef56b8 let output = futures::stream::iter(flights); From 8b419dab37cea63da32a643115d4abb0929f578c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:21:42 -0700 Subject: [PATCH 09/16] update release test script --- dev/release/00-prepare-test.rb | 4 +++- rust/datafusion/examples/README.md | 28 ++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) create mode 100644 rust/datafusion/examples/README.md diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index e9894d0bc8a..47baa047b2c 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -275,8 +275,10 @@ def test_version_pre_tag "+version = \"#{@release_version}\""], ["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }", "-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", + "-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }", "+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", - "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"] + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }"] ], }, { diff --git a/rust/datafusion/examples/README.md b/rust/datafusion/examples/README.md new file mode 100644 index 00000000000..163ef3d952b --- /dev/null +++ b/rust/datafusion/examples/README.md @@ -0,0 +1,28 @@ + + +# DataFusion Examples + +## Single Process + +The examples `csv_sql.rs` and `parquet_sql.rs` demonstrate building a query plan from a SQL statement and then executing the query plan against local CSV and Parquet files, respectively. + +## Distributed + +The `flight-client.rs` and `flight-server.rs` examples demonstrate how to run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol. \ No newline at end of file From 5ecea83b16f0343ff27a024385ac59859a830d57 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:42:56 -0700 Subject: [PATCH 10/16] formatting --- rust/datafusion/examples/flight-client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs index 2bfb889391c..e79b821fbec 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight-client.rs @@ -21,7 +21,6 @@ use flight::Ticket; #[tokio::main] async fn main() -> Result<(), Box> { - let mut client = FlightServiceClient::connect("http://localhost:50051").await?; let request = tonic::Request::new(Ticket { From efe05aef5b09f25fcb4ae8141207e5f97c8f1b31 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:49:11 -0700 Subject: [PATCH 11/16] update release test script --- dev/release/00-prepare-test.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index 47baa047b2c..5e38d0f1d73 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -275,9 +275,9 @@ def test_version_pre_tag "+version = \"#{@release_version}\""], ["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }", "-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", - "-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }", "+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", - "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"], + ["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }", "+arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }"] ], }, From 31c894bbc565a6ba27331371c19ea0c681c7506f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 12:57:21 -0700 Subject: [PATCH 12/16] update release test script --- dev/release/00-prepare-test.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index 5e38d0f1d73..afabe88fa7e 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -460,7 +460,9 @@ def test_version_post_tag ["-arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", "-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", "+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }", - "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"] + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"], + ["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }", + "+arrow-flight = { path = \"../arrow-flight\", version = \"#{@next_snapshot_version}\" }"] ], }, { From 459bef386956e79282f59f4ef5f907d6d02ced8a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 14:01:02 -0700 Subject: [PATCH 13/16] client parses schema from ipc batches --- rust/arrow/src/ipc/reader.rs | 6 ++++++ rust/datafusion/examples/flight-client.rs | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 99173641d32..c7f20bbd9e1 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -29,6 +29,7 @@ use crate::compute::cast; use crate::datatypes::{DataType, IntervalUnit, Schema, SchemaRef}; use crate::error::{ArrowError, Result}; use crate::ipc; +use crate::ipc::convert::fb_to_schema; use crate::record_batch::{RecordBatch, RecordBatchReader}; use DataType::*; @@ -663,6 +664,11 @@ impl RecordBatchReader for StreamReader { } } +pub fn schema_from_bytes(bytes: &[u8]) -> Option { + let ipc = ipc::get_root_as_message(&bytes[..]); + ipc.header_as_schema().map(|schema| fb_to_schema(schema)) +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs index e79b821fbec..8a34b7fce68 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight-client.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +use arrow::ipc::reader; use flight::flight_service_client::FlightServiceClient; - use flight::Ticket; #[tokio::main] @@ -31,6 +31,10 @@ async fn main() -> Result<(), Box> { while let Some(batch) = stream.message().await? { println!("BATCH = {:?}", batch); + + let schema = reader::schema_from_bytes(&batch.data_header); + + println!("SCHEMA = {:?}", schema); } Ok(()) From 1337b98904b46ba7373a8d9f830413d9332e790e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 14:36:15 -0700 Subject: [PATCH 14/16] parse recordbatch --- rust/arrow/src/ipc/reader.rs | 10 +++++++ rust/datafusion/examples/flight-client.rs | 34 ++++++++++++++++++----- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index c7f20bbd9e1..205b5b82aa3 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -669,6 +669,16 @@ pub fn schema_from_bytes(bytes: &[u8]) -> Option { ipc.header_as_schema().map(|schema| fb_to_schema(schema)) } +pub fn recordbatch_from_bytes( + bytes: &[u8], + schema: Arc, +) -> Result> { + let ipc = ipc::get_root_as_message(&bytes[..]); + match ipc.header_as_record_batch() { + Some(batch) => read_record_batch(&bytes[..].to_vec(), batch, schema), + None => Ok(None), + } +} #[cfg(test)] mod tests { use super::*; diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs index 8a34b7fce68..987d5cb4d67 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight-client.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use arrow::datatypes::Schema; use arrow::ipc::reader; use flight::flight_service_client::FlightServiceClient; use flight::Ticket; +use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box> { @@ -28,13 +30,31 @@ async fn main() -> Result<(), Box> { }); let mut stream = client.do_get(request).await?.into_inner(); - - while let Some(batch) = stream.message().await? { - println!("BATCH = {:?}", batch); - - let schema = reader::schema_from_bytes(&batch.data_header); - - println!("SCHEMA = {:?}", schema); + let mut batch_schema: Option> = None; + + while let Some(flight_data) = stream.message().await? { + println!("FlightData = {:?}", flight_data); + + if let Some(schema) = reader::schema_from_bytes(&flight_data.data_header) { + println!("Schema = {:?}", schema); + batch_schema = Some(Arc::new(schema.clone())); + } + + match batch_schema { + Some(ref schema) => { + if let Some(record_batch) = reader::recordbatch_from_bytes( + &flight_data.data_header, + schema.clone(), + )? { + println!( + "record_batch has {} columns and {} rows", + record_batch.num_columns(), + record_batch.num_rows() + ); + } + } + None => {} + } } Ok(()) From 9c473389a256757ffb4fd045490fa9f31638d664 Mon Sep 17 00:00:00 2001 From: Neville Dipale Date: Sun, 2 Feb 2020 00:49:59 +0200 Subject: [PATCH 15/16] Complete flight client's record batch reader --- rust/arrow/src/flight/mod.rs | 33 ++++++++++++++- rust/arrow/src/ipc/convert.rs | 6 +++ rust/arrow/src/ipc/reader.rs | 18 +------- rust/datafusion/examples/flight-client.rs | 51 ++++++++++++----------- 4 files changed, 65 insertions(+), 43 deletions(-) diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs index 90061df7269..6d09ca48c2a 100644 --- a/rust/arrow/src/flight/mod.rs +++ b/rust/arrow/src/flight/mod.rs @@ -17,10 +17,14 @@ //! Utilities to assist with reading and writing Arrow data as Flight messages +use std::convert::TryFrom; +use std::sync::Arc; + use flight::FlightData; use crate::datatypes::Schema; -use crate::ipc::writer; +use crate::error::{ArrowError, Result}; +use crate::ipc::{convert, reader, writer}; use crate::record_batch::RecordBatch; /// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes @@ -49,4 +53,31 @@ impl From<&Schema> for FlightData { } } +/// Try convert `FlightData` into an Arrow Schema +/// +/// Returns an error if the `FlightData` header is not a valid IPC schema +impl TryFrom<&FlightData> for Schema { + type Error = ArrowError; + fn try_from(data: &FlightData) -> Result { + convert::schema_from_bytes(&data.data_header[..]).ok_or(ArrowError::ParseError( + "Unable to convert flight data to Arrow schema".to_string(), + )) + } +} + +/// Convert a FlightData message to a RecordBatch +pub fn flight_data_to_batch( + data: &FlightData, + schema: Arc, +) -> Result> { + // check that the data_header is a record batch message + let message = crate::ipc::get_root_as_message(&data.data_header[..]); + let batch_header = message + .header_as_record_batch() + .ok_or(ArrowError::ParseError( + "Unable to convert flight data header to a record batch".to_string(), + ))?; + reader::read_record_batch(&data.data_body, batch_header, schema) +} + // TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index a38975fbf1c..f53914e31d2 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -152,6 +152,12 @@ pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema { Schema::new_with_metadata(fields, metadata) } +/// Deserialize an IPC message into a schema +pub(crate) fn schema_from_bytes(bytes: &[u8]) -> Option { + let ipc = ipc::get_root_as_message(bytes); + ipc.header_as_schema().map(|schema| fb_to_schema(schema)) +} + /// Get the Arrow data type from the flatbuffer Field table pub(crate) fn get_data_type(field: ipc::Field) -> DataType { match field.type_type() { diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 205b5b82aa3..25ba0cd3df1 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -29,7 +29,6 @@ use crate::compute::cast; use crate::datatypes::{DataType, IntervalUnit, Schema, SchemaRef}; use crate::error::{ArrowError, Result}; use crate::ipc; -use crate::ipc::convert::fb_to_schema; use crate::record_batch::{RecordBatch, RecordBatchReader}; use DataType::*; @@ -349,7 +348,7 @@ fn create_list_array( } /// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` -fn read_record_batch( +pub(crate) fn read_record_batch( buf: &Vec, batch: ipc::RecordBatch, schema: Arc, @@ -664,21 +663,6 @@ impl RecordBatchReader for StreamReader { } } -pub fn schema_from_bytes(bytes: &[u8]) -> Option { - let ipc = ipc::get_root_as_message(&bytes[..]); - ipc.header_as_schema().map(|schema| fb_to_schema(schema)) -} - -pub fn recordbatch_from_bytes( - bytes: &[u8], - schema: Arc, -) -> Result> { - let ipc = ipc::get_root_as_message(&bytes[..]); - match ipc.header_as_record_batch() { - Some(batch) => read_record_batch(&bytes[..].to_vec(), batch, schema), - None => Ok(None), - } -} #[cfg(test)] mod tests { use super::*; diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs index 987d5cb4d67..97db7934620 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight-client.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow::array::Int32Array; use arrow::datatypes::Schema; -use arrow::ipc::reader; +use arrow::flight::flight_data_to_batch; use flight::flight_service_client::FlightServiceClient; use flight::Ticket; -use std::sync::Arc; #[tokio::main] async fn main() -> Result<(), Box> { @@ -30,31 +33,29 @@ async fn main() -> Result<(), Box> { }); let mut stream = client.do_get(request).await?.into_inner(); - let mut batch_schema: Option> = None; + // the schema should be the first message returned, else client should error + let flight_data = stream.message().await?.unwrap(); + // convert FlightData to a stream + let schema = Arc::new(Schema::try_from(&flight_data)?); + println!("Schema: {:?}", schema); + + // all the remaining stream messages should be dictionary and record batches while let Some(flight_data) = stream.message().await? { - println!("FlightData = {:?}", flight_data); - - if let Some(schema) = reader::schema_from_bytes(&flight_data.data_header) { - println!("Schema = {:?}", schema); - batch_schema = Some(Arc::new(schema.clone())); - } - - match batch_schema { - Some(ref schema) => { - if let Some(record_batch) = reader::recordbatch_from_bytes( - &flight_data.data_header, - schema.clone(), - )? { - println!( - "record_batch has {} columns and {} rows", - record_batch.num_columns(), - record_batch.num_rows() - ); - } - } - None => {} - } + // the unwrap is infallible and thus safe + let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap(); + + println!( + "record_batch has {} columns and {} rows", + record_batch.num_columns(), + record_batch.num_rows() + ); + let column = record_batch.column(0); + let column = column + .as_any() + .downcast_ref::() + .expect("Unable to get column"); + println!("Column 1: {:?}", column); } Ok(()) From 788feef8c63b848b14dbd7c90b139a259cc41395 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 1 Feb 2020 16:10:42 -0700 Subject: [PATCH 16/16] code cleanup --- rust/datafusion/examples/flight-server.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs index 05e440f69e0..e6cc22ac503 100644 --- a/rust/datafusion/examples/flight-server.rs +++ b/rust/datafusion/examples/flight-server.rs @@ -74,13 +74,9 @@ impl FlightService for FlightServiceImpl { // create the query plan let plan = ctx .create_logical_plan(&sql) + .and_then(|plan| ctx.optimize(&plan)) + .and_then(|plan| ctx.create_physical_plan(&plan, 1024 * 1024)) .map_err(|e| to_tonic_err(&e))?; - let plan = ctx.optimize(&plan).map_err(|e| to_tonic_err(&e))?; - let plan = ctx - .create_physical_plan(&plan, 1024 * 1024) - .map_err(|e| to_tonic_err(&e))?; - - //TODO make this async // execute the query let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?;