Skip to content
Closed
68 changes: 25 additions & 43 deletions rust/arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,60 +26,41 @@ use arrow::error::{ArrowError, Result};
use arrow::ipc::{convert, reader, writer, writer::IpcWriteOptions};
use arrow::record_batch::RecordBatch;

/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_data_from_arrow_batch()`
impl From<&RecordBatch> for FlightData {
fn from(batch: &RecordBatch) -> Self {
let options = IpcWriteOptions::default();
flight_data_from_arrow_batch(batch, &options)
}
}

/// Convert a `RecordBatch` to `FlightData` by converting the header and body to bytes
/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries
/// and values
pub fn flight_data_from_arrow_batch(
batch: &RecordBatch,
options: &IpcWriteOptions,
) -> FlightData {
let data = writer::record_batch_to_bytes(batch, &options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: data.ipc_message,
data_body: data.arrow_data,
}
}
) -> Vec<FlightData> {
let data_gen = writer::IpcDataGenerator::default();
let mut dictionary_tracker = writer::DictionaryTracker::new(false);

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_schema_from_arrow_schema()`
impl From<&Schema> for SchemaResult {
fn from(schema: &Schema) -> Self {
let options = IpcWriteOptions::default();
flight_schema_from_arrow_schema(schema, &options)
}
let (encoded_dictionaries, encoded_batch) = data_gen
.encoded_batch(batch, &mut dictionary_tracker, &options)
.expect("DictionaryTracker configured above to not error on replacement");

encoded_dictionaries
.into_iter()
.chain(std::iter::once(encoded_batch))
.map(|data| FlightData {
flight_descriptor: None,
app_metadata: vec![],
data_header: data.ipc_message,
data_body: data.arrow_data,
})
.collect()
}

/// Convert a `Schema` to `SchemaResult` by converting to an IPC message
pub fn flight_schema_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> SchemaResult {
SchemaResult {
schema: writer::schema_to_bytes(schema, &options).ipc_message,
}
}
let data_gen = writer::IpcDataGenerator::default();
let schema_bytes = data_gen.schema_to_bytes(schema, &options);

/// Convert a `Schema` to `FlightData` by converting to an IPC message
///
/// Note: This implicitly uses the default `IpcWriteOptions`. To configure options,
/// use `flight_data_from_arrow_schema()`
impl From<&Schema> for FlightData {
fn from(schema: &Schema) -> Self {
let options = writer::IpcWriteOptions::default();
flight_data_from_arrow_schema(schema, &options)
SchemaResult {
schema: schema_bytes.ipc_message,
}
}

Expand All @@ -88,7 +69,8 @@ pub fn flight_data_from_arrow_schema(
schema: &Schema,
options: &IpcWriteOptions,
) -> FlightData {
let schema = writer::schema_to_bytes(schema, &options);
let data_gen = writer::IpcDataGenerator::default();
let schema = data_gen.schema_to_bytes(schema, &options);
FlightData {
flight_descriptor: None,
app_metadata: vec![],
Expand Down
Loading