diff --git a/rust/arrow-flight/src/utils.rs b/rust/arrow-flight/src/utils.rs index c28e39b8842..dbb5781aec9 100644 --- a/rust/arrow-flight/src/utils.rs +++ b/rust/arrow-flight/src/utils.rs @@ -23,44 +23,77 @@ use crate::{FlightData, SchemaResult}; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::{ArrowError, Result}; -use arrow::ipc::{convert, reader, writer}; +use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions}; use arrow::record_batch::RecordBatch; -/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes +/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes +/// +/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, +/// use `flight_data_from_arrow_batch()` impl From<&RecordBatch> for FlightData { fn from(batch: &RecordBatch) -> Self { - let options = writer::IpcWriteOptions::default(); - let data = writer::record_batch_to_bytes(batch, &options); - Self { - flight_descriptor: None, - app_metadata: vec![], - data_header: data.ipc_message, - data_body: data.arrow_data, - } + let options = IpcWriteOptions::default(); + flight_data_from_arrow_batch(batch, &options) + } +} + +/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes +pub fn flight_data_from_arrow_batch( + batch: &RecordBatch, + options: &IpcWriteOptions, +) -> FlightData { + let data = writer::record_batch_to_bytes(batch, &options); + FlightData { + flight_descriptor: None, + app_metadata: vec![], + data_header: data.ipc_message, + data_body: data.arrow_data, } } /// Convert a `Schema` to `SchemaResult` by converting to an IPC message +/// +/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, +/// use `flight_schema_from_arrow_schema()` impl From<&Schema> for SchemaResult { fn from(schema: &Schema) -> Self { - let options = writer::IpcWriteOptions::default(); - Self { - schema: writer::schema_to_bytes(schema, &options).ipc_message, - } + let options = IpcWriteOptions::default(); + flight_schema_from_arrow_schema(schema, &options) + } +} + +/// Convert a `Schema` to `SchemaResult` by converting to an IPC message +pub fn flight_schema_from_arrow_schema( + schema: &Schema, + options: &IpcWriteOptions, +) -> SchemaResult { + SchemaResult { + schema: writer::schema_to_bytes(schema, &options).ipc_message, } } /// Convert a `Schema` to `FlightData` by converting to an IPC message +/// +/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options, +/// use `flight_data_from_arrow_schema()` impl From<&Schema> for FlightData { fn from(schema: &Schema) -> Self { let options = writer::IpcWriteOptions::default(); - let schema = writer::schema_to_bytes(schema, &options); - Self { - flight_descriptor: None, - app_metadata: vec![], - data_header: schema.ipc_message, - data_body: vec![], - } + flight_data_from_arrow_schema(schema, &options) + } +} + +/// Convert a `Schema` to `FlightData` by converting to an IPC message +pub fn flight_data_from_arrow_schema( + schema: &Schema, + options: &IpcWriteOptions, +) -> FlightData { + let schema = writer::schema_to_bytes(schema, &options); + FlightData { + flight_descriptor: None, + app_metadata: vec![], + data_header: schema.ipc_message, + data_body: vec![], } } @@ -93,7 +126,7 @@ impl TryFrom<&SchemaResult> for Schema { } /// Convert a FlightData message to a RecordBatch -pub fn flight_data_to_batch( +pub fn flight_data_to_arrow_batch( data: &FlightData, schema: SchemaRef, ) -> Result> { diff --git a/rust/datafusion/examples/flight_client.rs b/rust/datafusion/examples/flight_client.rs index f33714f553c..3bc2a04a499 100644 --- a/rust/datafusion/examples/flight_client.rs +++ b/rust/datafusion/examples/flight_client.rs @@ -23,7 +23,7 @@ use arrow::util::pretty; use arrow_flight::flight_descriptor; use arrow_flight::flight_service_client::FlightServiceClient; -use arrow_flight::utils::flight_data_to_batch; +use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{FlightDescriptor, Ticket}; #[tokio::main] @@ -62,7 +62,8 @@ async fn main() -> Result<(), Box> { let mut results = vec![]; while let Some(flight_data) = stream.message().await? { // the unwrap is infallible and thus safe - let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap(); + let record_batch = + flight_data_to_arrow_batch(&flight_data, schema.clone())?.unwrap(); results.push(record_batch); }