From 7f6a02ecd2819f37cfb7bd8b94467a5793ed4d03 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Sep 2021 12:40:28 -0400 Subject: [PATCH 01/10] Update to arrow 6.0 --- Cargo.toml | 1 + ballista-examples/Cargo.toml | 2 +- ballista/rust/core/Cargo.toml | 2 +- ballista/rust/executor/Cargo.toml | 4 ++-- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/Cargo.toml | 4 ++-- python/Cargo.toml | 1 + 8 files changed, 10 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d6da8c14cd964..5b930e4093ca9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,3 +29,4 @@ members = [ ] exclude = ["python"] + diff --git a/ballista-examples/Cargo.toml b/ballista-examples/Cargo.toml index e0989b942ab97..55a5916f12a38 100644 --- a/ballista-examples/Cargo.toml +++ b/ballista-examples/Cargo.toml @@ -28,7 +28,7 @@ edition = "2018" publish = false [dependencies] -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../datafusion" } ballista = { path = "../ballista/rust/client" } prost = "0.8" diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index b3a4e42ac0d90..4518e778e84d7 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -43,7 +43,7 @@ tonic = "0.5" uuid = { version = "0.8", features = ["v4"] } chrono = "0.4" -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../../../datafusion", version = "5.1.0" } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index c6e0ab841d877..231b05f9e259d 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -29,8 +29,8 @@ edition = "2018" snmalloc = ["snmalloc-rs"] [dependencies] -arrow = { version = "^5.3" } -arrow-flight = { version = "^5.3" } +arrow = { version = "6.0.0" } +arrow-flight = { version = "6.0.0" } anyhow = "1" async-trait = "0.1.36" ballista-core = { path = "../core", version = "0.6.0" } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 3b9be677c7899..22196cad26d9d 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,5 +31,5 @@ clap = "2.33" rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion", version = "5.1.0" } -arrow = { version = "^5.3" } +arrow = { version = "6.0.0" } ballista = { path = "../ballista/rust/client", version = "0.6.0" } diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 113cd5bb91039..9f151f012e225 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -33,7 +33,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "^5.3" } +arrow-flight = { version = "6.0.0" } datafusion = { path = "../datafusion" } prost = "0.8" tonic = "0.5" diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 4fa646165d954..6e8b5216f07ea 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -50,8 +50,8 @@ avro = ["avro-rs", "num-traits"] [dependencies] ahash = "0.7" hashbrown = { version = "0.11", features = ["raw"] } -arrow = { version = "^5.3", features = ["prettyprint"] } -parquet = { version = "^5.3", features = ["arrow"] } +arrow = { version = "6.0.0", features = ["prettyprint"] } +parquet = { version = "6.0.0", features = ["arrow"] } sqlparser = "0.12" paste = "^1.0" num_cpus = "1.13.0" diff --git a/python/Cargo.toml b/python/Cargo.toml index 731edcb5c9e82..3ce1baea92f3a 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -37,3 +37,4 @@ uuid = { version = "0.8", features = ["v4"] } [lib] name = "datafusion" crate-type = ["cdylib"] + From ef57066ed20d95c1b091f30377109f162cdfa9e2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Sep 2021 12:46:30 -0400 Subject: [PATCH 02/10] Update empty result --- datafusion/src/execution/context.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 23667f5a6ec81..627f7ca46e332 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -1970,7 +1970,12 @@ mod tests { let results = execute("SELECT c1, AVG(c2) FROM test WHERE c1 = 123 GROUP BY c1", 4).await?; - let expected = vec!["++", "||", "++", "++"]; + let expected = vec![ + "+----+--------------+", + "| c1 | AVG(test.c2) |", + "+----+--------------+", + "+----+--------------+", + ]; assert_batches_sorted_eq!(expected, &results); Ok(()) From 819ce3b2a8ede4d246b52052f98d56402c8976dd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Sep 2021 13:32:27 -0400 Subject: [PATCH 03/10] Fix up sort_preserving_merge test --- .../physical_plan/sort_preserving_merge.rs | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 0b75f46e5e02c..169df9d9d14b7 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -1182,10 +1182,24 @@ mod tests { #[tokio::test] async fn test_async() { let schema = test::aggr_test_schema(); - let sort = vec![PhysicalSortExpr { - expr: col("c7", &schema).unwrap(), - options: SortOptions::default(), - }]; + let sort = vec![ + PhysicalSortExpr { + expr: col("c7", &schema).unwrap(), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: col("c1", &schema).unwrap(), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: col("c2", &schema).unwrap(), + options: SortOptions::default(), + }, + PhysicalSortExpr { + expr: col("c3", &schema).unwrap(), + options: SortOptions::default(), + }, + ]; let batches = sorted_partitioned_input(sort.clone(), &[5, 7, 3]).await; @@ -1234,7 +1248,11 @@ mod tests { let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); let partition = arrow::util::pretty::pretty_format_batches(&[merged]).unwrap(); - assert_eq!(basic, partition); + assert_eq!( + basic, partition, + "basic:\n\n{}\n\npartition:\n\n{}\n\n", + basic, partition + ); } #[tokio::test] From 9ab55fb781da1491c5f0f8b20f8f6bdb05c9ad63 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sun, 19 Sep 2021 06:51:30 -0400 Subject: [PATCH 04/10] Use column c12, per @yjshen --- .../physical_plan/sort_preserving_merge.rs | 22 ++++--------------- 1 file changed, 4 insertions(+), 18 deletions(-) diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 169df9d9d14b7..6d313b687b9b8 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -1182,24 +1182,10 @@ mod tests { #[tokio::test] async fn test_async() { let schema = test::aggr_test_schema(); - let sort = vec![ - PhysicalSortExpr { - expr: col("c7", &schema).unwrap(), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: col("c1", &schema).unwrap(), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: col("c2", &schema).unwrap(), - options: SortOptions::default(), - }, - PhysicalSortExpr { - expr: col("c3", &schema).unwrap(), - options: SortOptions::default(), - }, - ]; + let sort = vec![PhysicalSortExpr { + expr: col("c12", &schema).unwrap(), + options: SortOptions::default(), + }]; let batches = sorted_partitioned_input(sort.clone(), &[5, 7, 3]).await; From 537552ae57b55b11e5437992dcd297da8eb9d5c2 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 9 Sep 2021 14:04:55 -0400 Subject: [PATCH 05/10] Add minimal ballista support --- ballista/rust/core/src/serde/logical_plan/to_proto.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ballista/rust/core/src/serde/logical_plan/to_proto.rs b/ballista/rust/core/src/serde/logical_plan/to_proto.rs index ba7daca18a4e7..e79e6549620ec 100644 --- a/ballista/rust/core/src/serde/logical_plan/to_proto.rs +++ b/ballista/rust/core/src/serde/logical_plan/to_proto.rs @@ -360,6 +360,9 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { fractional: *fractional as u64, }) } + DataType::Map(_, _) => { + unimplemented!("Ballista does not yet support Map data type") + } } } } @@ -490,6 +493,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::Struct(_) | DataType::Union(_) | DataType::Dictionary(_, _) + | DataType::Map(_, _) | DataType::Decimal(_, _) => { return Err(proto_error(format!( "Error converting to Datatype to scalar type, {:?} is invalid as a datafusion scalar.", From 95a8c5874120b9b1c3be72c6c7dd75db15fe9c4a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 13 Oct 2021 15:31:26 -0400 Subject: [PATCH 06/10] Update to use new arrow apis --- .../src/physical_plan/expressions/in_list.rs | 18 ++++++++++-------- datafusion/src/physical_plan/hash_join.rs | 6 ++++-- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index 00767c7a67079..b48af1890202a 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -47,15 +47,17 @@ macro_rules! compare_op_scalar { // same as $left.len() let buffer = unsafe { MutableBuffer::from_trusted_len_iter_bool(comparison) }; - let data = ArrayData::new( - DataType::Boolean, - $left.len(), - None, - null_bit_buffer, - 0, - vec![Buffer::from(buffer)], + let data = unsafe { + ArrayData::new_unchecked( + DataType::Boolean, + $left.len(), + None, + null_bit_buffer, + 0, + vec![Buffer::from(buffer)], vec![], - ); + ) + }; Ok(BooleanArray::from(data)) }}; } diff --git a/datafusion/src/physical_plan/hash_join.rs b/datafusion/src/physical_plan/hash_join.rs index 2a14093e7db8c..2ed0faaa357b4 100644 --- a/datafusion/src/physical_plan/hash_join.rs +++ b/datafusion/src/physical_plan/hash_join.rs @@ -678,11 +678,13 @@ fn build_join_indexes( let left = ArrayData::builder(DataType::UInt64) .len(left_indices.len()) .add_buffer(left_indices.finish()) - .build(); + .build() + .unwrap(); let right = ArrayData::builder(DataType::UInt32) .len(right_indices.len()) .add_buffer(right_indices.finish()) - .build(); + .build() + .unwrap(); Ok(( PrimitiveArray::::from(left), From b30a35f8793c4315dbd350ed580b6971879ef9aa Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 18 Oct 2021 12:53:30 -0400 Subject: [PATCH 07/10] Fixup sort preserving merge test again --- datafusion/src/physical_plan/sort_preserving_merge.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 6d313b687b9b8..8427d9a80b6e5 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -971,7 +971,9 @@ mod tests { let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap(); - assert_eq!(basic, partition); + assert_eq!(basic, partition, + "basic:\n\n{}\n\npartition:\n\n{}\n\n", + basic, partition); } // Split the provided record batch into multiple batch_size record batches From 4e39f6d8e26dcdf38728791ddd0b5dcd19ac3d1c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 18 Oct 2021 13:15:05 -0400 Subject: [PATCH 08/10] make sort deterministic --- datafusion/src/physical_plan/expressions/in_list.rs | 2 +- .../src/physical_plan/sort_preserving_merge.rs | 12 +++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/datafusion/src/physical_plan/expressions/in_list.rs b/datafusion/src/physical_plan/expressions/in_list.rs index b48af1890202a..826ffa87ae835 100644 --- a/datafusion/src/physical_plan/expressions/in_list.rs +++ b/datafusion/src/physical_plan/expressions/in_list.rs @@ -55,7 +55,7 @@ macro_rules! compare_op_scalar { null_bit_buffer, 0, vec![Buffer::from(buffer)], - vec![], + vec![], ) }; Ok(BooleanArray::from(data)) diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index 8427d9a80b6e5..5aaf9789f6997 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -963,6 +963,10 @@ mod tests { expr: col("c7", &schema).unwrap(), options: SortOptions::default(), }, + PhysicalSortExpr { + expr: col("c12", &schema).unwrap(), + options: SortOptions::default(), + }, ]; let basic = basic_sort(csv.clone(), sort.clone()).await; @@ -971,9 +975,11 @@ mod tests { let basic = arrow::util::pretty::pretty_format_batches(&[basic]).unwrap(); let partition = arrow::util::pretty::pretty_format_batches(&[partition]).unwrap(); - assert_eq!(basic, partition, - "basic:\n\n{}\n\npartition:\n\n{}\n\n", - basic, partition); + assert_eq!( + basic, partition, + "basic:\n\n{}\n\npartition:\n\n{}\n\n", + basic, partition + ); } // Split the provided record batch into multiple batch_size record batches From f2b04ad46abb3bccaaf7dd562a7ba8f4fb1f5bb4 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 18 Oct 2021 13:15:43 -0400 Subject: [PATCH 09/10] less whitespace --- Cargo.toml | 1 - python/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5b930e4093ca9..d6da8c14cd964 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,4 +29,3 @@ members = [ ] exclude = ["python"] - diff --git a/python/Cargo.toml b/python/Cargo.toml index 3ce1baea92f3a..731edcb5c9e82 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -37,4 +37,3 @@ uuid = { version = "0.8", features = ["v4"] } [lib] name = "datafusion" crate-type = ["cdylib"] - From 2899c499078d5a592648f0f0755f45d15842350c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 18 Oct 2021 14:00:42 -0400 Subject: [PATCH 10/10] patch up avro --- datafusion/src/avro_to_arrow/arrow_array_reader.rs | 8 ++++++-- datafusion/src/avro_to_arrow/schema.rs | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/src/avro_to_arrow/arrow_array_reader.rs index cc8ed8e669426..bb863d6311019 100644 --- a/datafusion/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/src/avro_to_arrow/arrow_array_reader.rs @@ -484,6 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .add_buffer(bool_values.into()) .null_bit_buffer(bool_nulls.into()) .build() + .unwrap() } DataType::Int8 => self.read_primitive_list_values::(rows), DataType::Int16 => self.read_primitive_list_values::(rows), @@ -569,6 +570,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .null_bit_buffer(buf) .child_data(arrays.into_iter().map(|a| a.data().clone()).collect()) .build() + .unwrap() } datatype => { return Err(ArrowError::SchemaError(format!( @@ -583,7 +585,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(array_data) .null_bit_buffer(list_nulls.into()) - .build(); + .build() + .unwrap(); Ok(Arc::new(GenericListArray::::from(list_data))) } @@ -776,7 +779,8 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .child_data( arrays.into_iter().map(|a| a.data().clone()).collect(), ) - .build(); + .build() + .unwrap(); Ok(make_array(data)) } _ => Err(ArrowError::SchemaError(format!( diff --git a/datafusion/src/avro_to_arrow/schema.rs b/datafusion/src/avro_to_arrow/schema.rs index c2927f0829ba6..c6eda80170129 100644 --- a/datafusion/src/avro_to_arrow/schema.rs +++ b/datafusion/src/avro_to_arrow/schema.rs @@ -213,6 +213,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Struct(_) => "struct", DataType::Union(_) => "union", DataType::Dictionary(_, _) => "map", + DataType::Map(_, _) => unimplemented!("Map support not implemented"), DataType::Decimal(_, _) => "decimal", } }