From 0d7b45ff3171b15a62c10d89786a5834373a7468 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 27 May 2022 10:29:52 +0100 Subject: [PATCH 1/3] Prepare for arrow 15 release --- Cargo.toml | 4 ++++ datafusion/common/src/scalar.rs | 2 +- .../core/src/avro_to_arrow/arrow_array_reader.rs | 9 ++++----- datafusion/core/src/avro_to_arrow/schema.rs | 5 +++-- datafusion/core/src/physical_plan/common.rs | 2 +- .../core/src/physical_plan/file_format/parquet.rs | 13 +++++++++---- datafusion/physical-expr/src/expressions/case.rs | 4 ++-- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/from_proto.rs | 11 +++++++++-- datafusion/proto/src/lib.rs | 4 ++++ datafusion/proto/src/to_proto.rs | 10 ++++------ 11 files changed, 42 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index db2cc46e9079b..b7e6258f15a01 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,3 +35,7 @@ exclude = ["datafusion-cli"] codegen-units = 1 lto = true +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} +parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 758cc18bb1ca9..775f2ef8891b4 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -1059,7 +1059,7 @@ impl ScalarValue { let offsets_array = offsets.finish(); let array_data = ArrayDataBuilder::new(data_type.clone()) .len(offsets_array.len() - 1) - .null_bit_buffer(valid.finish()) + .null_bit_buffer(Some(valid.finish())) .add_buffer(offsets_array.data().buffers()[0].clone()) .add_child_data(flat_array.data().clone()); diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index dfaa98555a6ea..663dffa155685 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -484,7 +484,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { ArrayData::builder(list_field.data_type().clone()) .len(valid_len) .add_buffer(bool_values.into()) - .null_bit_buffer(bool_nulls.into()) + .null_bit_buffer(Some(bool_nulls.into())) .build() .unwrap() } @@ -567,10 +567,9 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { let arrays = self.build_struct_array(rows.as_slice(), fields.as_slice(), &[])?; let data_type = DataType::Struct(fields.clone()); - let buf = null_buffer.into(); ArrayDataBuilder::new(data_type) .len(rows.len()) - .null_bit_buffer(buf) + .null_bit_buffer(Some(null_buffer.into())) .child_data(arrays.into_iter().map(|a| a.data().clone()).collect()) .build() .unwrap() @@ -587,7 +586,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { .len(list_len) .add_buffer(Buffer::from_slice_ref(&offsets)) .add_child_data(array_data) - .null_bit_buffer(list_nulls.into()) + .null_bit_buffer(Some(list_nulls.into())) .build() .unwrap(); Ok(Arc::new(GenericListArray::::from(list_data))) @@ -778,7 +777,7 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { let data_type = DataType::Struct(fields.clone()); let data = ArrayDataBuilder::new(data_type) .len(len) - .null_bit_buffer(null_buffer.into()) + .null_bit_buffer(Some(null_buffer.into())) .child_data( arrays.into_iter().map(|a| a.data().clone()).collect(), ) diff --git a/datafusion/core/src/avro_to_arrow/schema.rs b/datafusion/core/src/avro_to_arrow/schema.rs index 2e9a17de38db2..dd8b90776bc67 100644 --- a/datafusion/core/src/avro_to_arrow/schema.rs +++ b/datafusion/core/src/avro_to_arrow/schema.rs @@ -103,7 +103,8 @@ fn schema_to_field_with_props( .iter() .map(|s| schema_to_field_with_props(s, None, has_nullable, None)) .collect::>>()?; - DataType::Union(fields, UnionMode::Dense) + let type_ids = (0_i8..fields.len() as i8).collect(); + DataType::Union(fields, type_ids, UnionMode::Dense) } } AvroSchema::Record { name, fields, .. } => { @@ -212,7 +213,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::FixedSizeList(_, _) => "fixed_size_list", DataType::LargeList(_) => "largelist", DataType::Struct(_) => "struct", - DataType::Union(_, _) => "union", + DataType::Union(_, _, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), DataType::Decimal(_, _) => "decimal", diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 24df647dcde99..e5230a5c15164 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -365,7 +365,7 @@ mod tests { let expected = Statistics { is_exact: true, num_rows: Some(3), - total_byte_size: Some(416), // this might change a bit if the way we compute the size changes + total_byte_size: Some(464), // this might change a bit if the way we compute the size changes column_statistics: Some(vec![ ColumnStatistics { distinct_count: None, diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index 0931484ef7708..b700f4e5db3c2 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -37,8 +37,9 @@ use futures::{Stream, StreamExt, TryStreamExt}; use log::debug; use parquet::arrow::{ arrow_reader::ParquetRecordBatchReader, ArrowReader, ArrowWriter, - ParquetFileArrowReader, + ParquetFileArrowReader, ProjectionMask, }; +use parquet::file::reader::FileReader; use parquet::file::{ metadata::RowGroupMetaData, properties::WriterProperties, reader::SerializedFileReader, serialized_reader::ReadOptionsBuilder, @@ -352,14 +353,18 @@ impl ParquetExecStream { opt.build(), )?; + let file_metadata = file_reader.metadata().file_metadata(); + let parquet_schema = file_metadata.schema_descr_ptr(); + let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); + let arrow_schema = arrow_reader.get_schema()?; let adapted_projections = self .adapter - .map_projections(&arrow_reader.get_schema()?, &self.projection)?; + .map_projections(&arrow_schema, &self.projection)?; - let reader = arrow_reader - .get_record_reader_by_columns(adapted_projections, self.batch_size)?; + let mask = ProjectionMask::roots(&parquet_schema, adapted_projections); + let reader = arrow_reader.get_record_reader_by_columns(mask, self.batch_size)?; Ok(reader) } diff --git a/datafusion/physical-expr/src/expressions/case.rs b/datafusion/physical-expr/src/expressions/case.rs index e7db10d1772a0..d2a0fac994834 100644 --- a/datafusion/physical-expr/src/expressions/case.rs +++ b/datafusion/physical-expr/src/expressions/case.rs @@ -594,10 +594,10 @@ mod tests { .collect(); //let valid_array = vec![true, false, false, true, false, tru - let null_buffer = Buffer::from_slice_ref(&[0b00101001u8]); + let null_buffer = Buffer::from([0b00101001u8]); let load4 = ArrayDataBuilder::new(load4.data_type().clone()) .len(load4.len()) - .null_bit_buffer(null_buffer) + .null_bit_buffer(Some(null_buffer)) .buffers(load4.data().buffers().to_vec()) .build() .unwrap(); diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index a4b1863615ca0..8ea613510ab08 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -398,6 +398,7 @@ enum UnionMode{ message Union{ repeated Field union_types = 1; UnionMode union_mode = 2; + repeated int32 type_ids = 3; } message ScalarListValue{ diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index fd72db4edac46..6717e5b47a5ac 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -338,9 +338,16 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { let union_types = union .union_types .iter() - .map(|field| field.try_into()) + .map(TryInto::try_into) .collect::, _>>()?; - DataType::Union(union_types, union_mode) + + // Default to index based type ids if not provided + let type_ids = match union.type_ids.is_empty() { + true => (0..union_types.len() as i8).collect(), + false => union.type_ids.iter().map(|i| *i as i8).collect(), + }; + + DataType::Union(union_types, type_ids, union_mode) } arrow_type::ArrowTypeEnum::Dictionary(dict) => { let key_datatype = dict.as_ref().key.as_deref().required("key")?; diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 3b042da423b01..0fc6cb3a90407 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -362,6 +362,7 @@ mod roundtrip_tests { Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), ], + vec![0, 2, 3], UnionMode::Dense, ), DataType::Union( @@ -379,6 +380,7 @@ mod roundtrip_tests { true, ), ], + vec![1, 2, 3], UnionMode::Sparse, ), DataType::Dictionary( @@ -514,6 +516,7 @@ mod roundtrip_tests { Field::new("name", DataType::Utf8, false), Field::new("datatype", DataType::Binary, false), ], + vec![7, 5, 3], UnionMode::Sparse, ), DataType::Union( @@ -531,6 +534,7 @@ mod roundtrip_tests { true, ), ], + vec![5, 8, 1], UnionMode::Dense, ), DataType::Dictionary( diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index 7aa4278b39a49..70c4d57c4c1d7 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -200,17 +200,15 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { .map(|field| field.into()) .collect::>(), }), - DataType::Union(union_types, union_mode) => { + DataType::Union(union_types, type_ids, union_mode) => { let union_mode = match union_mode { UnionMode::Sparse => protobuf::UnionMode::Sparse, UnionMode::Dense => protobuf::UnionMode::Dense, }; Self::Union(protobuf::Union { - union_types: union_types - .iter() - .map(|field| field.into()) - .collect::>(), + union_types: union_types.iter().map(Into::into).collect(), union_mode: union_mode.into(), + type_ids: type_ids.iter().map(|x| *x as i32).collect(), }) } DataType::Dictionary(key_type, value_type) => { @@ -1188,7 +1186,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::FixedSizeList(_, _) | DataType::LargeList(_) | DataType::Struct(_) - | DataType::Union(_, _) + | DataType::Union(_, _, _) | DataType::Dictionary(_, _) | DataType::Map(_, _) | DataType::Decimal(_, _) => { From 63b03371393521cad687754180f9472b42d0e0c3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 16:05:24 +0100 Subject: [PATCH 2/3] Update to arrow 15.0.0 --- Cargo.toml | 5 ----- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/common/Cargo.toml | 4 ++-- datafusion/core/Cargo.toml | 4 ++-- datafusion/core/fuzz-utils/Cargo.toml | 2 +- datafusion/expr/Cargo.toml | 2 +- datafusion/jit/Cargo.toml | 2 +- datafusion/physical-expr/Cargo.toml | 2 +- datafusion/proto/Cargo.toml | 2 +- datafusion/row/Cargo.toml | 2 +- datafusion/sql/Cargo.toml | 2 +- 12 files changed, 13 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7e6258f15a01..1cc7aa6eb3b88 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,3 @@ exclude = ["datafusion-cli"] [profile.release] codegen-units = 1 lto = true - -[patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} -arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} -parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "8e1666a8206f2eea4dd4e55c9365859c6a32a3f0"} diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 0c210bc0e306c..d2d2ba5e4bc79 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.59" readme = "README.md" [dependencies] -arrow = { version = "14.0.0" } +arrow = { version = "15.0.0" } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "8.0.0" } dirs = "4.0.0" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index 963efdf0dcd84..399248f1d5216 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "14.0.0" } +arrow-flight = { version = "15.0.0" } async-trait = "0.1.41" datafusion = { path = "../datafusion/core" } futures = "0.3" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index a8f1032fd00c0..bf52bc26d085f 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -38,10 +38,10 @@ jit = ["cranelift-module"] pyarrow = ["pyo3"] [dependencies] -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.84.0", optional = true } ordered-float = "3.0" -parquet = { version = "14.0.0", features = ["arrow"], optional = true } +parquet = { version = "15.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } sqlparser = "0.17" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f8170443e2135..4f5120635affe 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } async-trait = "0.1.41" avro-rs = { version = "0.13", features = ["snappy"], optional = true } chrono = { version = "0.4", default-features = false } @@ -74,7 +74,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" ordered-float = "3.0" parking_lot = "0.12" -parquet = { version = "14.0.0", features = ["arrow"] } +parquet = { version = "15.0.0", features = ["arrow"] } paste = "^1.0" pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/fuzz-utils/Cargo.toml b/datafusion/core/fuzz-utils/Cargo.toml index 4089ad74e110b..aae7c108a469e 100644 --- a/datafusion/core/fuzz-utils/Cargo.toml +++ b/datafusion/core/fuzz-utils/Cargo.toml @@ -23,6 +23,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } env_logger = "0.9.0" rand = "0.8" diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 22c73e6a97f1d..8252013ef34c4 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } datafusion-common = { path = "../common", version = "8.0.0" } sqlparser = "0.17" diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml index a8ca329bfcea3..ba45a799e9a92 100644 --- a/datafusion/jit/Cargo.toml +++ b/datafusion/jit/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" jit = [] [dependencies] -arrow = { version = "14.0.0" } +arrow = { version = "15.0.0" } cranelift = "0.84.0" cranelift-jit = "0.84.0" cranelift-module = "0.84.0" diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index c6c286f0b7748..f4226ba339e44 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4", default-features = false } diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index 58a69124dbf4a..2ca17e2f35de7 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -35,7 +35,7 @@ path = "src/lib.rs" [features] [dependencies] -arrow = { version = "14.0.0" } +arrow = { version = "15.0.0" } datafusion = { path = "../core", version = "8.0.0" } datafusion-common = { path = "../common", version = "8.0.0" } datafusion-expr = { path = "../expr", version = "8.0.0" } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index f9a150699ec62..bb755ecea656f 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -37,7 +37,7 @@ path = "src/lib.rs" jit = ["datafusion-jit"] [dependencies] -arrow = { version = "14.0.0" } +arrow = { version = "15.0.0" } datafusion-common = { path = "../common", version = "8.0.0" } datafusion-jit = { path = "../jit", version = "8.0.0", optional = true } paste = "^1.0" diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 673823c37ad22..cd2172a65b9f4 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -38,7 +38,7 @@ unicode_expressions = [] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "14.0.0", features = ["prettyprint"] } +arrow = { version = "15.0.0", features = ["prettyprint"] } datafusion-common = { path = "../common", version = "8.0.0" } datafusion-expr = { path = "../expr", version = "8.0.0" } hashbrown = "0.12" From 539b31ad7f41f9f10f590f21c5aecf33a5bd0261 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 30 May 2022 16:12:36 +0100 Subject: [PATCH 3/3] Update ballista pin --- dev/build-arrow-ballista.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/build-arrow-ballista.sh b/dev/build-arrow-ballista.sh index 0c39226273102..12b6e9bc00549 100755 --- a/dev/build-arrow-ballista.sh +++ b/dev/build-arrow-ballista.sh @@ -24,7 +24,7 @@ rm -rf arrow-ballista 2>/dev/null # clone the repo # TODO make repo/branch configurable -git clone https://github.com/apache/arrow-ballista +git clone https://github.com/tustvold/arrow-ballista -b arrow-15 # update dependencies to local crates python ./dev/make-ballista-deps-local.py