diff --git a/dev/release/00-prepare-test.rb b/dev/release/00-prepare-test.rb index e9894d0bc8a..afabe88fa7e 100644 --- a/dev/release/00-prepare-test.rb +++ b/dev/release/00-prepare-test.rb @@ -276,7 +276,9 @@ def test_version_pre_tag ["-arrow = { path = \"../arrow\", version = \"#{@snapshot_version}\" }", "-parquet = { path = \"../parquet\", version = \"#{@snapshot_version}\" }", "+arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", - "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"] + "+parquet = { path = \"../parquet\", version = \"#{@release_version}\" }"], + ["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@snapshot_version}\" }", + "+arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }"] ], }, { @@ -458,7 +460,9 @@ def test_version_post_tag ["-arrow = { path = \"../arrow\", version = \"#{@release_version}\" }", "-parquet = { path = \"../parquet\", version = \"#{@release_version}\" }", "+arrow = { path = \"../arrow\", version = \"#{@next_snapshot_version}\" }", - "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"] + "+parquet = { path = \"../parquet\", version = \"#{@next_snapshot_version}\" }"], + ["-arrow-flight = { path = \"../arrow-flight\", version = \"#{@release_version}\" }", + "+arrow-flight = { path = \"../arrow-flight\", version = \"#{@next_snapshot_version}\" }"] ], }, { diff --git a/rust/arrow/Cargo.toml b/rust/arrow/Cargo.toml index 29829610889..57fc261985c 100644 --- a/rust/arrow/Cargo.toml +++ b/rust/arrow/Cargo.toml @@ -50,10 +50,12 @@ packed_simd = { version = "0.3.1", optional = true } chrono = "0.4" flatbuffers = "0.6.0" hex = "0.4" +arrow-flight = { path = "../arrow-flight", optional = true } [features] simd = ["packed_simd"] -default = ["simd"] +flight = ["arrow-flight"] +default = ["simd", "flight"] [dev-dependencies] criterion = "0.2" diff --git a/rust/arrow/src/flight/mod.rs b/rust/arrow/src/flight/mod.rs new file mode 100644 index 00000000000..6d09ca48c2a --- /dev/null +++ b/rust/arrow/src/flight/mod.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities to assist with reading and writing Arrow data as Flight messages + +use std::convert::TryFrom; +use std::sync::Arc; + +use flight::FlightData; + +use crate::datatypes::Schema; +use crate::error::{ArrowError, Result}; +use crate::ipc::{convert, reader, writer}; +use crate::record_batch::RecordBatch; + +/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes +impl From<&RecordBatch> for FlightData { + fn from(batch: &RecordBatch) -> Self { + let (header, body) = writer::record_batch_to_bytes(batch); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: header, + data_body: body, + } + } +} + +/// Convert a `Schema` to `FlightData` by converting to an IPC message +impl From<&Schema> for FlightData { + fn from(schema: &Schema) -> Self { + let schema = writer::schema_to_bytes(schema); + Self { + flight_descriptor: None, + app_metadata: vec![], + data_header: schema, + data_body: vec![], + } + } +} + +/// Try convert `FlightData` into an Arrow Schema +/// +/// Returns an error if the `FlightData` header is not a valid IPC schema +impl TryFrom<&FlightData> for Schema { + type Error = ArrowError; + fn try_from(data: &FlightData) -> Result { + convert::schema_from_bytes(&data.data_header[..]).ok_or(ArrowError::ParseError( + "Unable to convert flight data to Arrow schema".to_string(), + )) + } +} + +/// Convert a FlightData message to a RecordBatch +pub fn flight_data_to_batch( + data: &FlightData, + schema: Arc, +) -> Result> { + // check that the data_header is a record batch message + let message = crate::ipc::get_root_as_message(&data.data_header[..]); + let batch_header = message + .header_as_record_batch() + .ok_or(ArrowError::ParseError( + "Unable to convert flight data header to a record batch".to_string(), + ))?; + reader::read_record_batch(&data.data_body, batch_header, schema) +} + +// TODO: add more explicit conversion that expoess flight descriptor and metadata options diff --git a/rust/arrow/src/ipc/convert.rs b/rust/arrow/src/ipc/convert.rs index a38975fbf1c..f53914e31d2 100644 --- a/rust/arrow/src/ipc/convert.rs +++ b/rust/arrow/src/ipc/convert.rs @@ -152,6 +152,12 @@ pub(crate) fn fb_to_schema(fb: ipc::Schema) -> Schema { Schema::new_with_metadata(fields, metadata) } +/// Deserialize an IPC message into a schema +pub(crate) fn schema_from_bytes(bytes: &[u8]) -> Option { + let ipc = ipc::get_root_as_message(bytes); + ipc.header_as_schema().map(|schema| fb_to_schema(schema)) +} + /// Get the Arrow data type from the flatbuffer Field table pub(crate) fn get_data_type(field: ipc::Field) -> DataType { match field.type_type() { diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 99173641d32..25ba0cd3df1 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -348,7 +348,7 @@ fn create_list_array( } /// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema` -fn read_record_batch( +pub(crate) fn read_record_batch( buf: &Vec, batch: ipc::RecordBatch, schema: Arc, diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 36c89c7a666..c872c8286d7 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -209,8 +209,7 @@ impl Drop for StreamWriter { } } -/// Convert the schema to its IPC representation, and write it to the `writer` -fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { +pub(crate) fn schema_to_bytes(schema: &Schema) -> Vec { let mut fbb = FlatBufferBuilder::new(); let schema = { let fb = ipc::convert::schema_to_fb_offset(&mut fbb, schema); @@ -227,9 +226,13 @@ fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result< fbb.finish(data, None); let data = fbb.finished_data(); - let written = write_padded_data(writer, data, WriteDataType::Header); + data.to_vec() +} - written +/// Convert the schema to its IPC representation, and write it to the `writer` +fn write_schema(writer: &mut BufWriter, schema: &Schema) -> Result { + let data = schema_to_bytes(schema); + write_padded_data(writer, &data[..], WriteDataType::Header) } /// The message type being written. This determines whether to write the data length or not. @@ -266,13 +269,8 @@ fn write_padded_data( Ok(total_len as usize) } -/// Write a record batch to the writer, writing the message size before the message -/// if the record batch is being written to a stream -fn write_record_batch( - writer: &mut BufWriter, - batch: &RecordBatch, - is_stream: bool, -) -> Result<(usize, usize)> { +/// Write a `RecordBatch` into a tuple of bytes, one for the header (ipc::Message) and the other for the batch's data +pub(crate) fn record_batch_to_bytes(batch: &RecordBatch) -> (Vec, Vec) { let mut fbb = FlatBufferBuilder::new(); let mut nodes: Vec = vec![]; @@ -313,13 +311,24 @@ fn write_record_batch( let root = message.finish(); fbb.finish(root, None); let finished_data = fbb.finished_data(); + + (finished_data.to_vec(), arrow_data) +} + +/// Write a record batch to the writer, writing the message size before the message +/// if the record batch is being written to a stream +fn write_record_batch( + writer: &mut BufWriter, + batch: &RecordBatch, + is_stream: bool, +) -> Result<(usize, usize)> { + let (meta_data, arrow_data) = record_batch_to_bytes(batch); // write the length of data if writing to stream if is_stream { - let total_len: u32 = finished_data.len() as u32; + let total_len: u32 = meta_data.len() as u32; writer.write(&total_len.to_le_bytes()[..])?; } - let meta_written = - write_padded_data(writer, fbb.finished_data(), WriteDataType::Body)?; + let meta_written = write_padded_data(writer, &meta_data[..], WriteDataType::Body)?; let arrow_data_written = write_padded_data(writer, &arrow_data[..], WriteDataType::Body)?; Ok((meta_written, arrow_data_written)) diff --git a/rust/arrow/src/lib.rs b/rust/arrow/src/lib.rs index 899bb62f08a..4383922be09 100644 --- a/rust/arrow/src/lib.rs +++ b/rust/arrow/src/lib.rs @@ -33,6 +33,8 @@ pub mod compute; pub mod csv; pub mod datatypes; pub mod error; +#[cfg(feature = "flight")] +pub mod flight; pub mod ipc; pub mod json; pub mod memory; diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index 14fc4e54548..f489d2b0d6d 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -56,6 +56,12 @@ crossbeam = "0.7.1" [dev-dependencies] criterion = "0.2.0" tempdir = "0.3.7" +futures = "0.3" +prost = "0.6" +tokio = { version = "0.2", features = ["macros"] } +tonic = "0.1" +flatbuffers = "0.6.0" +arrow-flight = { path = "../arrow-flight", version = "0.16.0-SNAPSHOT" } [[bench]] name = "aggregate_query_sql" diff --git a/rust/datafusion/examples/README.md b/rust/datafusion/examples/README.md new file mode 100644 index 00000000000..163ef3d952b --- /dev/null +++ b/rust/datafusion/examples/README.md @@ -0,0 +1,28 @@ + + +# DataFusion Examples + +## Single Process + +The examples `csv_sql.rs` and `parquet_sql.rs` demonstrate building a query plan from a SQL statement and then executing the query plan against local CSV and Parquet files, respectively. + +## Distributed + +The `flight-client.rs` and `flight-server.rs` examples demonstrate how to run DataFusion as a standalone process and execute SQL queries from a client using the Flight protocol. \ No newline at end of file diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight-client.rs new file mode 100644 index 00000000000..97db7934620 --- /dev/null +++ b/rust/datafusion/examples/flight-client.rs @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow::array::Int32Array; +use arrow::datatypes::Schema; +use arrow::flight::flight_data_to_batch; +use flight::flight_service_client::FlightServiceClient; +use flight::Ticket; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = FlightServiceClient::connect("http://localhost:50051").await?; + + let request = tonic::Request::new(Ticket { + ticket: "SELECT id FROM alltypes_plain".into(), + }); + + let mut stream = client.do_get(request).await?.into_inner(); + + // the schema should be the first message returned, else client should error + let flight_data = stream.message().await?.unwrap(); + // convert FlightData to a stream + let schema = Arc::new(Schema::try_from(&flight_data)?); + println!("Schema: {:?}", schema); + + // all the remaining stream messages should be dictionary and record batches + while let Some(flight_data) = stream.message().await? { + // the unwrap is infallible and thus safe + let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap(); + + println!( + "record_batch has {} columns and {} rows", + record_batch.num_columns(), + record_batch.num_rows() + ); + let column = record_batch.column(0); + let column = column + .as_any() + .downcast_ref::() + .expect("Unable to get column"); + println!("Column 1: {:?}", column); + } + + Ok(()) +} diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight-server.rs new file mode 100644 index 00000000000..e6cc22ac503 --- /dev/null +++ b/rust/datafusion/examples/flight-server.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::pin::Pin; + +use futures::Stream; +use tonic::transport::Server; +use tonic::{Request, Response, Status, Streaming}; + +use datafusion::execution::context::ExecutionContext; + +use flight::{ + flight_service_server::FlightService, flight_service_server::FlightServiceServer, + Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo, + HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, +}; + +#[derive(Clone)] +pub struct FlightServiceImpl {} + +#[tonic::async_trait] +impl FlightService for FlightServiceImpl { + type HandshakeStream = Pin< + Box> + Send + Sync + 'static>, + >; + type ListFlightsStream = + Pin> + Send + Sync + 'static>>; + type DoGetStream = + Pin> + Send + Sync + 'static>>; + type DoPutStream = + Pin> + Send + Sync + 'static>>; + type DoActionStream = Pin< + Box> + Send + Sync + 'static>, + >; + type ListActionsStream = + Pin> + Send + Sync + 'static>>; + + async fn do_get( + &self, + request: Request, + ) -> Result, Status> { + let ticket = request.into_inner(); + match String::from_utf8(ticket.ticket.to_vec()) { + Ok(sql) => { + println!("do_get: {}", sql); + + // create local execution context + let mut ctx = ExecutionContext::new(); + + let testdata = std::env::var("PARQUET_TEST_DATA") + .expect("PARQUET_TEST_DATA not defined"); + + // register parquet file with the execution context + ctx.register_parquet( + "alltypes_plain", + &format!("{}/alltypes_plain.parquet", testdata), + ) + .map_err(|e| to_tonic_err(&e))?; + + // create the query plan + let plan = ctx + .create_logical_plan(&sql) + .and_then(|plan| ctx.optimize(&plan)) + .and_then(|plan| ctx.create_physical_plan(&plan, 1024 * 1024)) + .map_err(|e| to_tonic_err(&e))?; + + // execute the query + let results = ctx.collect(plan.as_ref()).map_err(|e| to_tonic_err(&e))?; + if results.is_empty() { + return Err(Status::internal("There were no results from ticket")); + } + + // add an initial FlightData message that sends schema + let schema = plan.schema(); + let mut flights: Vec> = + vec![Ok(FlightData::from(schema.as_ref()))]; + + let mut batches: Vec> = results + .iter() + .map(|batch| Ok(FlightData::from(batch))) + .collect(); + + // append batch vector to schema vector, so that the first message sent is the schema + flights.append(&mut batches); + + let output = futures::stream::iter(flights); + + Ok(Response::new(Box::pin(output) as Self::DoGetStream)) + } + Err(e) => Err(Status::invalid_argument(format!("Invalid ticket: {:?}", e))), + } + } + + async fn handshake( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_flights( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn get_flight_info( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn get_schema( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_put( + &self, + _request: Request>, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn do_action( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } + + async fn list_actions( + &self, + _request: Request, + ) -> Result, Status> { + Err(Status::unimplemented("Not yet implemented")) + } +} + +fn to_tonic_err(e: &datafusion::error::ExecutionError) -> Status { + Status::internal(format!("{:?}", e)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let addr = "0.0.0.0:50051".parse()?; + let service = FlightServiceImpl {}; + + let svc = FlightServiceServer::new(service); + + println!("Listening on {:?}", addr); + + Server::builder().add_service(svc).serve(addr).await?; + + Ok(()) +}