Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 55 additions & 22 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,44 +23,77 @@ use crate::{FlightData, SchemaResult};

use arrow::datatypes::{Schema, SchemaRef};
use arrow::error::{ArrowError, Result};
use arrow::ipc::{convert, reader, writer};
use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData` by getting the header and body as bytes
/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_data_from_arrow_batch()`
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let options = writer::IpcWriteOptions::default();
let data = writer::record_batch_to_bytes(batch, &options);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: data.ipc_message,
data_body: data.arrow_data,
}
let options = IpcWriteOptions::default();
flight_data_from_arrow_batch(batch, &options)
}
}

/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes
pub fn flight_data_from_arrow_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> FlightData {
let data = writer::record_batch_to_bytes(batch, &options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: data.ipc_message,
data_body: data.arrow_data,
}
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_schema_from_arrow_schema()`
impl From<&Schema> for SchemaResult {
fn from(schema: &Schema) -> Self {
let options = writer::IpcWriteOptions::default();
Self {
schema: writer::schema_to_bytes(schema, &options).ipc_message,
}
let options = IpcWriteOptions::default();
flight_schema_from_arrow_schema(schema, &options)
}
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> SchemaResult {
SchemaResult {
schema: writer::schema_to_bytes(schema, &options).ipc_message,
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_data_from_arrow_schema()`
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let options = writer::IpcWriteOptions::default();
let schema = writer::schema_to_bytes(schema, &options);
Self {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema.ipc_message,
data_body: vec![],
}
flight_data_from_arrow_schema(schema, &options)
}
}

/// Convert a `Schema` to `FlightData` by converting to an IPC message
pub fn flight_data_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> FlightData {
let schema = writer::schema_to_bytes(schema, &options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: schema.ipc_message,
data_body: vec![],
}
}

Expand Down Expand Up @@ -93,7 +126,7 @@ impl TryFrom<&SchemaResult> for Schema {
}

/// Convert a FlightData message to a RecordBatch
pub fn flight_data_to_batch(
pub fn flight_data_to_arrow_batch(
data: &FlightData,
schema: SchemaRef,
) -> Result<Option<RecordBatch>> {
Expand Down
5 changes: 3 additions & 2 deletions rust/datafusion/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow::util::pretty;

use arrow_flight::flight_descriptor;
use arrow_flight::flight_service_client::FlightServiceClient;
use arrow_flight::utils::flight_data_to_batch;
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightDescriptor, Ticket};

#[tokio::main]
Expand Down Expand Up @@ -62,7 +62,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut results = vec![];
while let Some(flight_data) = stream.message().await? {
// the unwrap is infallible and thus safe
let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap();
let record_batch =
flight_data_to_arrow_batch(&flight_data, schema.clone())?.unwrap();
results.push(record_batch);
}

Expand Down