diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index e0989b942ab97..55a5916f12a38 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -28,7 +28,7 @@ edition = "2018" publish = false [dependencies] -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } prost = "0.8" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index b3a4e42ac0d90..4518e778e84d7 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -43,7 +43,7 @@ tonic = "0.5" uuid = { version = "0.8", features = ["v4"] } chrono = "0.4" -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../../../datafusion", version = "5.1.0" } diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index ba7daca18a4e7..e79e6549620ec 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -360,6 +360,9 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { fractional: *fractional as u64, }) } + DataType::Map(_, _) => { + unimplemented!("Ballista does not yet support Map data type") + } } } } @@ -490,6 +493,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::Struct(_) | DataType::Union(_) | DataType::Dictionary(_, _) + | DataType::Map(_, _) | DataType::Decimal(_, _) => { return Err(proto_error(format!( "Error converting to Datatype to scalar type, {:?} is invalid as a datafusion scalar.", diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index c6e0ab841d877..231b05f9e259d 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -29,8 +29,8 @@ edition = "2018" snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { version = "^5.3" } -arrow-flight = { version = "^5.3" } +arrow = { version = "6.0.0" } +arrow-flight = { version = "6.0.0" } anyhow = "1" async-trait = "0.1.36" ballista-core = { path = "../core", version = "0.6.0" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 3b9be677c7899..22196cad26d9d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,5 +31,5 @@ clap = "2.33" rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "5.1.0" } -arrow = { version = "^5.3" } +arrow = { version = "6.0.0" } ballista = { path = "../ballista/rust/client", version = "0.6.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 113cd5bb91039..9f151f012e225 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -33,7 +33,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../datafusion" } prost = "0.8" tonic = "0.5" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 4fa646165d954..6e8b5216f07ea 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -50,8 +50,8 @@ avro = ["avro-rs", "num-traits"] [dependencies] ahash = "0.7" hashbrown = { version = "0.11", features = ["raw"] } -arrow = { version = "^5.3", features = ["prettyprint"] } -parquet = { version = "^5.3", features = ["arrow"] } +arrow = { version = "6.0.0", features = ["prettyprint"] } +parquet = { version = "6.0.0", features = ["arrow"] } sqlparser = "0.12" paste = "^1.0" num_cpus = "1.13.0" diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/src/avro_to_arrow/arrow_array_reader.rs index cc8ed8e669426..bb863d6311019 100644 --- a/datafusion/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs @@ -484,6 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .add_buffer(bool_values.into()) .null_bit_buffer(bool_nulls.into()) .build() + .unwrap() } DataType::Int8 => self.read_primitive_list_values::(rows), DataType::Int16 => self.read_primitive_list_values::(rows), @@ -569,6 +570,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .null_bit_buffer(buf) .child_data(arrays.into_iter().map(|a| a.data().clone()).collect()) .build() + .unwrap() } datatype => { return Err(ArrowError::SchemaError(format!( @@ -583,7 +585,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(array_data) .null_bit_buffer(list_nulls.into()) - .build(); + .build() + .unwrap(); Ok(Arc::new(GenericListArray::::from(list_data))) } @@ -776,7 +779,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .child_data( arrays.into_iter().map(|a| a.data().clone()).collect(), ) - .build(); + .build() + .unwrap(); Ok(make_array(data)) } _ => Err(ArrowError::SchemaError(format!( diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs index c2927f0829ba6..c6eda80170129 100644 --- a/datafusion/src/avro_to_arrow/schema.rs +++ b/datafusion/src/avro_to_arrow/schema.rs @@ -213,6 +213,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Struct(_) => "struct", DataType::Union(_) => "union", DataType::Dictionary(_, _) => "map", + DataType::Map(_, _) => unimplemented!("Map support not implemented"), DataType::Decimal(_, _) => "decimal", } } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 23667f5a6ec81..627f7ca46e332 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1970,7 +1970,12 @@ mod tests { let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - let expected = vec!["++", "||", "++", "++"]; + let expected = vec![ + "+----+--------------+", + "| c1 | AVG(test.c2) |", + "+----+--------------+", + "+----+--------------+", + ]; assert_batches_sorted_eq!(expected, &results); Ok(()) diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 00767c7a67079..826ffa87ae835 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -47,15 +47,17 @@ macro_rules! compare_op_scalar { // same as $left.len() let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) }; - let data = ArrayData::new( - DataType::Boolean, - $left.len(), - None, - null_bit_buffer, - 0, - vec![Buffer::from(buffer)], - vec![], - ); + let data = unsafe { + ArrayData::new_unchecked( + DataType::Boolean, + $left.len(), + None, + null_bit_buffer, + 0, + vec![Buffer::from(buffer)], + vec![], + ) + }; Ok(BooleanArray::from(data)) }}; } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 2a14093e7db8c..2ed0faaa357b4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -678,11 +678,13 @@ fn build_join_indexes( let left = ArrayData::builder(DataType::UInt64) .len(left_indices.len()) .add_buffer(left_indices.finish()) - .build(); + .build() + .unwrap(); let right = ArrayData::builder(DataType::UInt32) .len(right_indices.len()) .add_buffer(right_indices.finish()) - .build(); + .build() + .unwrap(); Ok(( PrimitiveArray::::from(left), diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 0b75f46e5e02c..5aaf9789f6997 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -963,6 +963,10 @@ mod tests { expr: col("c7", &schema).unwrap(), options: SortOptions::default(), }, + PhysicalSortExpr { + expr: col("c12", &schema).unwrap(), + options: SortOptions::default(), + }, ]; let basic = basic_sort(csv.clone(), sort.clone()).await; @@ -971,7 +975,11 @@ mod tests { let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap(); - assert_eq!(basic, partition); + assert_eq!( + basic, partition, + "basic:\n\n{}\n\npartition:\n\n{}\n\n", + basic, partition + ); } // Split the provided record batch into multiple batch_size record batches @@ -1183,7 +1191,7 @@ mod tests { async fn test_async() { let schema = test::aggr_test_schema(); let sort = vec![PhysicalSortExpr { - expr: col("c7", &schema).unwrap(), + expr: col("c12", &schema).unwrap(), options: SortOptions::default(), }]; @@ -1234,7 +1242,11 @@ mod tests { let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); let partition = arrow::util::pretty::pretty_format_batches(&[merged]).unwrap(); - assert_eq!(basic, partition); + assert_eq!( + basic, partition, + "basic:\n\n{}\n\npartition:\n\n{}\n\n", + basic, partition + ); } #[tokio::test]