From e49f99e7a394796287e5d4410d9707e1a62e56db Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Jul 2021 12:43:26 -0400 Subject: [PATCH 1/2] Update to arrow 5.0 --- ballista/rust/core/Cargo.toml | 2 +- ballista/rust/executor/Cargo.toml | 4 ++-- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/Cargo.toml | 6 +++--- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 3a89c75a5cd72..ce72d2fda92d4 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -42,7 +42,7 @@ tokio = "1.0" tonic = "0.4" uuid = { version = "0.8", features = ["v4"] } -arrow-flight = { version = "4.0" } +arrow-flight = { version = "5.0" } datafusion = { path = "../../../datafusion" } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 68e4920f3b40b..428a5bb0f01f5 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -43,8 +43,8 @@ tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.4" uuid = { version = "0.8", features = ["v4"] } -arrow = { version = "4.0" } -arrow-flight = { version = "4.0" } +arrow = { version = "5.0" } +arrow-flight = { version = "5.0" } datafusion = { path = "../../../datafusion" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index cd17b61984d5e..fda9271876aa8 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,4 +31,4 @@ clap = "2.33" rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion" } -arrow = { version = "4.0" } +arrow = { version = "5.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 886f8f5e74f68..35aa3764d6dc4 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -29,7 +29,7 @@ publish = false [dev-dependencies] -arrow-flight = { version = "4.0" } +arrow-flight = { version = "5.0" } datafusion = { path = "../datafusion" } prost = "0.7" tonic = "0.4" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 845de6213f4d3..2f1e997c3596f 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -46,8 +46,8 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = "0.7" hashbrown = "0.11" -arrow = { version = "4.4", features = ["prettyprint"] } -parquet = { version = "4.4", features = ["arrow"] } +arrow = { version = "5.0", features = ["prettyprint"] } +parquet = { version = "5.0", features = ["arrow"] } sqlparser = "0.9.0" paste = "^1.0" num_cpus = "1.13.0" @@ -98,4 +98,4 @@ harness = false [[bench]] name = "physical_plan" -harness = false \ No newline at end of file +harness = false From f8c46ba57f48ad707cfa48b24572589a257cd2e2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 13 Jul 2021 13:09:09 -0400 Subject: [PATCH 2/2] Update to new interfaces --- ballista/rust/executor/src/flight_service.rs | 6 ++---- datafusion-examples/examples/flight_server.rs | 11 +++-------- datafusion/src/physical_plan/hash_aggregate.rs | 4 ++-- datafusion/src/physical_plan/mod.rs | 5 +++-- datafusion/src/scalar.rs | 2 +- 5 files changed, 11 insertions(+), 17 deletions(-) diff --git a/ballista/rust/executor/src/flight_service.rs b/ballista/rust/executor/src/flight_service.rs index 7325287f074f5..9a3f2d872d521 100644 --- a/ballista/rust/executor/src/flight_service.rs +++ b/ballista/rust/executor/src/flight_service.rs @@ -23,6 +23,7 @@ use std::pin::Pin; use std::sync::Arc; use crate::executor::Executor; +use arrow_flight::SchemaAsIpc; use ballista_core::error::BallistaError; use ballista_core::serde::decode_protobuf; use ballista_core::serde::scheduler::Action as BallistaAction; @@ -218,10 +219,7 @@ where T: Read + Seek, { let options = arrow::ipc::writer::IpcWriteOptions::default(); - let schema_flight_data = arrow_flight::utils::flight_data_from_arrow_schema( - reader.schema().as_ref(), - &options, - ); + let schema_flight_data = SchemaAsIpc::new(reader.schema().as_ref(), &options).into(); send_response(&tx, Ok(schema_flight_data)).await?; for batch in reader { diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 8496bcb18914f..138434ea2482f 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -17,6 +17,7 @@ use std::pin::Pin; +use arrow_flight::SchemaAsIpc; use futures::Stream; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; @@ -67,10 +68,7 @@ impl FlightService for FlightServiceImpl { let table = ParquetTable::try_new(&request.path[0], num_cpus::get()).unwrap(); let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); - let schema_result = arrow_flight::utils::flight_schema_from_arrow_schema( - table.schema().as_ref(), - &options, - ); + let schema_result = SchemaAsIpc::new(table.schema().as_ref(), &options).into(); Ok(Response::new(schema_result)) } @@ -108,10 +106,7 @@ impl FlightService for FlightServiceImpl { // add an initial FlightData message that sends schema let options = datafusion::arrow::ipc::writer::IpcWriteOptions::default(); let schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema( - &df.schema().clone().into(), - &options, - ); + SchemaAsIpc::new(&df.schema().clone().into(), &options).into(); let mut flights: Vec> = vec![Ok(schema_flight_data)]; diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index b4b7c224024d3..ae513831bef46 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -508,7 +508,7 @@ fn dictionary_create_key_for_col( )) })?; - create_key_for_col(&dict_col.values(), values_index, vec) + create_key_for_col(dict_col.values(), values_index, vec) } /// Appends a sequence of [u8] bytes for the value in `col[row]` to @@ -1104,7 +1104,7 @@ fn dictionary_create_group_by_value( )) })?; - create_group_by_value(&dict_col.values(), values_index) + create_group_by_value(dict_col.values(), values_index) } /// Extract the value in `col[row]` as a GroupByScalar diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index d89eb11885041..b3c0dd63e9eda 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -505,8 +505,9 @@ pub trait WindowExpr: Send + Sync + Debug { end: num_rows, }]) } else { - lexicographical_partition_ranges(partition_columns) - .map_err(DataFusionError::ArrowError) + Ok(lexicographical_partition_ranges(partition_columns) + .map_err(DataFusionError::ArrowError)? + .collect::>()) } } diff --git a/datafusion/src/scalar.rs b/datafusion/src/scalar.rs index f94a090a538a1..e7354f8e62ec1 100644 --- a/datafusion/src/scalar.rs +++ b/datafusion/src/scalar.rs @@ -778,7 +778,7 @@ impl ScalarValue { keys_col.data_type() )) })?; - Self::try_from_array(&dict_array.values(), values_index) + Self::try_from_array(dict_array.values(), values_index) } }