From 670b140ba4fd9b76d65d0b98e1570e625bc2f263 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:01:25 -0400 Subject: [PATCH 1/7] TEMP: Patch to use apache repo --- Cargo.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index feb3ebe5c5713..fe55edef493ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,3 +38,8 @@ exclude = ["ballista-cli", "datafusion-cli"] [profile.release] codegen-units = 1 lto = true + +[patch.crates-io] +arrow = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } +parquet = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } From fcd7a7dbf71adea6313a7e0de26137cbdbeaa83a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:01:34 -0400 Subject: [PATCH 2/7] Update to arrow 14.0.0 --- ballista-cli/Cargo.toml | 2 +- ballista/rust/core/Cargo.toml | 2 +- ballista/rust/executor/Cargo.toml | 4 ++-- datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 2 +- datafusion/common/Cargo.toml | 4 ++-- datafusion/core/Cargo.toml | 4 ++-- datafusion/core/fuzz-utils/Cargo.toml | 2 +- datafusion/expr/Cargo.toml | 2 +- datafusion/jit/Cargo.toml | 2 +- datafusion/physical-expr/Cargo.toml | 2 +- datafusion/row/Cargo.toml | 2 +- 12 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ballista-cli/Cargo.toml b/ballista-cli/Cargo.toml index 7537dc4b2b83a..de8ee10ca5254 100644 --- a/ballista-cli/Cargo.toml +++ b/ballista-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.59" readme = "README.md" [dependencies] -arrow = { version = "13" } +arrow = { version = "14.0.0" } ballista = { path = "../ballista/rust/client", version = "0.7.0" } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "8.0.0" } diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index db2b60ca4182e..3d43aceb4f0f2 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -35,7 +35,7 @@ simd = ["datafusion/simd"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow-flight = { version = "13" } +arrow-flight = { version = "14.0.0" } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } clap = { version = "3", features = ["derive", "cargo"] } diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index 3a2b45c5c20f5..959a55078dabe 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -34,8 +34,8 @@ snmalloc = ["snmalloc-rs"] [dependencies] anyhow = "1" -arrow = { version = "13" } -arrow-flight = { version = "13" } +arrow = { version = "14.0.0" } +arrow-flight = { version = "14.0.0" } async-trait = "0.1.41" ballista-core = { path = "../core", version = "0.7.0" } chrono = { version = "0.4", default-features = false } diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 82fadcd0d072f..b57c78bff1367 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.59" readme = "README.md" [dependencies] -arrow = { version = "13" } +arrow = { version = "14.0.0" } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "8.0.0" } dirs = "4.0.0" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index b6e5424d79e6a..963efdf0dcd84 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,7 +34,7 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "13" } +arrow-flight = { version = "14.0.0" } async-trait = "0.1.41" datafusion = { path = "../datafusion/core" } futures = "0.3" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 652dde5fbd5e1..6a46a11f61234 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -38,10 +38,10 @@ jit = ["cranelift-module"] pyarrow = ["pyo3"] [dependencies] -arrow = { version = "13", features = ["prettyprint"] } +arrow = { version = "14.0.0", features = ["prettyprint"] } avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.83.0", optional = true } ordered-float = "3.0" -parquet = { version = "13", features = ["arrow"], optional = true } +parquet = { version = "14.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } sqlparser = "0.17" diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 259095c29dad4..c8a21d8bc8ed8 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -55,7 +55,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "13", features = ["prettyprint"] } +arrow = { version = "14.0.0", features = ["prettyprint"] } async-trait = "0.1.41" avro-rs = { version = "0.13", features = ["snappy"], optional = true } chrono = { version = "0.4", default-features = false } @@ -73,7 +73,7 @@ num-traits = { version = "0.2", optional = true } num_cpus = "1.13.0" ordered-float = "3.0" parking_lot = "0.12" -parquet = { version = "13", features = ["arrow"] } +parquet = { version = "14.0.0", features = ["arrow"] } paste = "^1.0" pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/fuzz-utils/Cargo.toml b/datafusion/core/fuzz-utils/Cargo.toml index 858543d409da1..4089ad74e110b 100644 --- a/datafusion/core/fuzz-utils/Cargo.toml +++ b/datafusion/core/fuzz-utils/Cargo.toml @@ -23,6 +23,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "13", features = ["prettyprint"] } +arrow = { version = "14.0.0", features = ["prettyprint"] } env_logger = "0.9.0" rand = "0.8" diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 25786e670987c..22c73e6a97f1d 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "13", features = ["prettyprint"] } +arrow = { version = "14.0.0", features = ["prettyprint"] } datafusion-common = { path = "../common", version = "8.0.0" } sqlparser = "0.17" diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml index e3278be6d2966..260e78d1ba3e8 100644 --- a/datafusion/jit/Cargo.toml +++ b/datafusion/jit/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" jit = [] [dependencies] -arrow = { version = "13" } +arrow = { version = "14.0.0" } cranelift = "0.83.0" cranelift-jit = "0.83.0" cranelift-module = "0.83.0" diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index f9a472555060b..c6c286f0b7748 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "13", features = ["prettyprint"] } +arrow = { version = "14.0.0", features = ["prettyprint"] } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4", default-features = false } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index 13b485fb7cd58..f9a150699ec62 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -37,7 +37,7 @@ path = "src/lib.rs" jit = ["datafusion-jit"] [dependencies] -arrow = { version = "13" } +arrow = { version = "14.0.0" } datafusion-common = { path = "../common", version = "8.0.0" } datafusion-jit = { path = "../jit", version = "8.0.0", optional = true } paste = "^1.0" From aa6717e5541b5416ac8ab37516a41f525978d186 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:08:04 -0400 Subject: [PATCH 3/7] Consolidate to single OffsetSizeTrait --- .../src/aggregate/approx_distinct.rs | 16 ++++++------ .../physical-expr/src/crypto_expressions.rs | 5 ++-- .../physical-expr/src/datetime_expressions.rs | 4 +-- .../physical-expr/src/expressions/binary.rs | 4 +-- .../physical-expr/src/expressions/in_list.rs | 8 +++--- .../physical-expr/src/regex_expressions.rs | 6 ++--- .../physical-expr/src/string_expressions.rs | 26 +++++++++---------- .../physical-expr/src/unicode_expressions.rs | 22 +++++++--------- 8 files changed, 44 insertions(+), 47 deletions(-) diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs b/datafusion/physical-expr/src/aggregate/approx_distinct.rs index 3912584479422..c67d1c9d35aa7 100644 --- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs @@ -21,8 +21,8 @@ use super::hyperloglog::HyperLogLog; use crate::expressions::format_state_name; use crate::{AggregateExpr, PhysicalExpr}; use arrow::array::{ - ArrayRef, BinaryArray, BinaryOffsetSizeTrait, GenericBinaryArray, GenericStringArray, - PrimitiveArray, StringOffsetSizeTrait, + ArrayRef, BinaryArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, + PrimitiveArray, }; use arrow::datatypes::{ ArrowPrimitiveType, DataType, Field, Int16Type, Int32Type, Int64Type, Int8Type, @@ -119,7 +119,7 @@ impl AggregateExpr for ApproxDistinct { #[derive(Debug)] struct BinaryHLLAccumulator where - T: BinaryOffsetSizeTrait, + T: OffsetSizeTrait, { hll: HyperLogLog>, phantom_data: PhantomData, @@ -127,7 +127,7 @@ where impl BinaryHLLAccumulator where - T: BinaryOffsetSizeTrait, + T: OffsetSizeTrait, { /// new approx_distinct accumulator pub fn new() -> Self { @@ -141,7 +141,7 @@ where #[derive(Debug)] struct StringHLLAccumulator where - T: StringOffsetSizeTrait, + T: OffsetSizeTrait, { hll: HyperLogLog, phantom_data: PhantomData, @@ -149,7 +149,7 @@ where impl StringHLLAccumulator where - T: StringOffsetSizeTrait, + T: OffsetSizeTrait, { /// new approx_distinct accumulator pub fn new() -> Self { @@ -259,7 +259,7 @@ macro_rules! downcast_value { impl Accumulator for BinaryHLLAccumulator where - T: BinaryOffsetSizeTrait, + T: OffsetSizeTrait, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &GenericBinaryArray = @@ -275,7 +275,7 @@ where impl Accumulator for StringHLLAccumulator where - T: StringOffsetSizeTrait, + T: OffsetSizeTrait, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { let array: &GenericStringArray = diff --git a/datafusion/physical-expr/src/crypto_expressions.rs b/datafusion/physical-expr/src/crypto_expressions.rs index 95bedd4af41db..e0314317c25f1 100644 --- a/datafusion/physical-expr/src/crypto_expressions.rs +++ b/datafusion/physical-expr/src/crypto_expressions.rs @@ -19,8 +19,7 @@ use arrow::{ array::{ - Array, ArrayRef, BinaryArray, GenericStringArray, StringArray, - StringOffsetSizeTrait, + Array, ArrayRef, BinaryArray, GenericStringArray, OffsetSizeTrait, StringArray, }, datatypes::DataType, }; @@ -127,7 +126,7 @@ impl DigestAlgorithm { /// digest a string array to their hash values fn digest_array(self, value: &dyn Array) -> Result where - T: StringOffsetSizeTrait, + T: OffsetSizeTrait, { let input_value = value .as_any() diff --git a/datafusion/physical-expr/src/datetime_expressions.rs b/datafusion/physical-expr/src/datetime_expressions.rs index 9a8351d0d3590..1e1cfd07f3b2d 100644 --- a/datafusion/physical-expr/src/datetime_expressions.rs +++ b/datafusion/physical-expr/src/datetime_expressions.rs @@ -18,7 +18,7 @@ //! DateTime expressions use arrow::{ - array::{Array, ArrayRef, GenericStringArray, PrimitiveArray, StringOffsetSizeTrait}, + array::{Array, ArrayRef, GenericStringArray, OffsetSizeTrait, PrimitiveArray}, compute::kernels::cast_utils::string_to_timestamp_nanos, datatypes::{ ArrowPrimitiveType, DataType, TimestampMicrosecondType, TimestampMillisecondType, @@ -57,7 +57,7 @@ pub(crate) fn unary_string_to_primitive_function<'a, T, O, F>( ) -> Result> where O: ArrowPrimitiveType, - T: StringOffsetSizeTrait, + T: OffsetSizeTrait, F: Fn(&'a str) -> Result, { if args.len() != 1 { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index b7a0da3e0bbef..e507f8fe8c3a2 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -1351,7 +1351,7 @@ where .collect()) } -fn is_distinct_from_utf8( +fn is_distinct_from_utf8( left: &GenericStringArray, right: &GenericStringArray, ) -> Result { @@ -1397,7 +1397,7 @@ where .collect()) } -fn is_not_distinct_from_utf8( +fn is_not_distinct_from_utf8( left: &GenericStringArray, right: &GenericStringArray, ) -> Result { diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 7094a718d0001..a872f65c6c115 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow::array::GenericStringArray; use arrow::array::{ ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array, - Int64Array, Int8Array, StringOffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array, + Int64Array, Int8Array, OffsetSizeTrait, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }; use arrow::datatypes::ArrowPrimitiveType; @@ -250,14 +250,14 @@ fn not_in_list_primitive( } // whether each value on the left (can be null) is contained in the non-null list -fn in_list_utf8( +fn in_list_utf8( array: &GenericStringArray, values: &[&str], ) -> Result { compare_op_scalar!(array, values, |x, v: &[&str]| v.contains(&x)) } -fn not_in_list_utf8( +fn not_in_list_utf8( array: &GenericStringArray, values: &[&str], ) -> Result { @@ -341,7 +341,7 @@ impl InListExpr { /// Compare for specific utf8 types #[allow(clippy::unnecessary_wraps)] - fn compare_utf8( + fn compare_utf8( &self, array: ArrayRef, list_values: Vec, diff --git a/datafusion/physical-expr/src/regex_expressions.rs b/datafusion/physical-expr/src/regex_expressions.rs index 69de68e166f65..c53c946155392 100644 --- a/datafusion/physical-expr/src/regex_expressions.rs +++ b/datafusion/physical-expr/src/regex_expressions.rs @@ -21,7 +21,7 @@ //! Regex expressions -use arrow::array::{ArrayRef, GenericStringArray, StringOffsetSizeTrait}; +use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow::compute; use datafusion_common::{DataFusionError, Result}; use hashbrown::HashMap; @@ -45,7 +45,7 @@ macro_rules! downcast_string_arg { } /// extract a specific group from a string column, using a regular expression -pub fn regexp_match(args: &[ArrayRef]) -> Result { +pub fn regexp_match(args: &[ArrayRef]) -> Result { match args.len() { 2 => { let values = downcast_string_arg!(args[0], "string", T); @@ -79,7 +79,7 @@ fn regex_replace_posix_groups(replacement: &str) -> String { /// Replaces substring(s) matching a POSIX regular expression. /// /// example: `regexp_replace('Thomas', '.[mN]a.', 'M') = 'ThM'` -pub fn regexp_replace(args: &[ArrayRef]) -> Result { +pub fn regexp_replace(args: &[ArrayRef]) -> Result { // creating Regex is expensive so create hashmap for memoization let mut patterns: HashMap = HashMap::new(); diff --git a/datafusion/physical-expr/src/string_expressions.rs b/datafusion/physical-expr/src/string_expressions.rs index 1f369cdad3bca..c13a853bb07b0 100644 --- a/datafusion/physical-expr/src/string_expressions.rs +++ b/datafusion/physical-expr/src/string_expressions.rs @@ -24,7 +24,7 @@ use arrow::{ array::{ Array, ArrayRef, BooleanArray, GenericStringArray, Int32Array, Int64Array, - PrimitiveArray, StringArray, StringOffsetSizeTrait, + OffsetSizeTrait, PrimitiveArray, StringArray, }, datatypes::{ArrowNativeType, ArrowPrimitiveType, DataType}, }; @@ -98,8 +98,8 @@ pub(crate) fn unary_string_function<'a, T, O, F, R>( ) -> Result> where R: AsRef, - O: StringOffsetSizeTrait, - T: StringOffsetSizeTrait, + O: OffsetSizeTrait, + T: OffsetSizeTrait, F: Fn(&'a str) -> R, { if args.len() != 1 { @@ -167,7 +167,7 @@ where /// Returns the numeric code of the first character of the argument. /// ascii('x') = 120 -pub fn ascii(args: &[ArrayRef]) -> Result { +pub fn ascii(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let result = string_array @@ -185,7 +185,7 @@ pub fn ascii(args: &[ArrayRef]) -> Result { /// Removes the longest string containing only characters in characters (a space by default) from the start and end of string. /// btrim('xyxtrimyyx', 'xyz') = 'trim' -pub fn btrim(args: &[ArrayRef]) -> Result { +pub fn btrim(args: &[ArrayRef]) -> Result { match args.len() { 1 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -363,7 +363,7 @@ pub fn concat_ws(args: &[ArrayRef]) -> Result { /// Converts the first letter of each word to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by non-alphanumeric characters. /// initcap('hi THOMAS') = 'Hi Thomas' -pub fn initcap(args: &[ArrayRef]) -> Result { +pub fn initcap(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); // first map is the iterator, second is for the `Option<_>` @@ -399,7 +399,7 @@ pub fn lower(args: &[ColumnarValue]) -> Result { /// Removes the longest string containing only characters in characters (a space by default) from the start of string. /// ltrim('zzzytest', 'xyz') = 'test' -pub fn ltrim(args: &[ArrayRef]) -> Result { +pub fn ltrim(args: &[ArrayRef]) -> Result { match args.len() { 1 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -438,7 +438,7 @@ pub fn ltrim(args: &[ArrayRef]) -> Result { /// Repeats string the specified number of times. /// repeat('Pg', 4) = 'PgPgPgPg' -pub fn repeat(args: &[ArrayRef]) -> Result { +pub fn repeat(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let number_array = downcast_arg!(args[1], "number", Int64Array); @@ -456,7 +456,7 @@ pub fn repeat(args: &[ArrayRef]) -> Result { /// Replaces all occurrences in string of substring from with substring to. /// replace('abcdefabcdef', 'cd', 'XX') = 'abXXefabXXef' -pub fn replace(args: &[ArrayRef]) -> Result { +pub fn replace(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let from_array = downcast_string_arg!(args[1], "from", T); let to_array = downcast_string_arg!(args[2], "to", T); @@ -476,7 +476,7 @@ pub fn replace(args: &[ArrayRef]) -> Result /// Removes the longest string containing only characters in characters (a space by default) from the end of string. /// rtrim('testxxzx', 'xyz') = 'test' -pub fn rtrim(args: &[ArrayRef]) -> Result { +pub fn rtrim(args: &[ArrayRef]) -> Result { match args.len() { 1 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -515,7 +515,7 @@ pub fn rtrim(args: &[ArrayRef]) -> Result { /// Splits string at occurrences of delimiter and returns the n'th field (counting from one). /// split_part('abc~@~def~@~ghi', '~@~', 2) = 'def' -pub fn split_part(args: &[ArrayRef]) -> Result { +pub fn split_part(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let delimiter_array = downcast_string_arg!(args[1], "delimiter", T); let n_array = downcast_arg!(args[2], "n", Int64Array); @@ -547,7 +547,7 @@ pub fn split_part(args: &[ArrayRef]) -> Result(args: &[ArrayRef]) -> Result { +pub fn starts_with(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let prefix_array = downcast_string_arg!(args[1], "prefix", T); @@ -567,7 +567,7 @@ pub fn starts_with(args: &[ArrayRef]) -> Result(args: &[ArrayRef]) -> Result where - T::Native: StringOffsetSizeTrait, + T::Native: OffsetSizeTrait, { let integer_array = downcast_primitive_array_arg!(args[0], "integer", T); diff --git a/datafusion/physical-expr/src/unicode_expressions.rs b/datafusion/physical-expr/src/unicode_expressions.rs index 86a2ef7ba9a06..0730d24f5b2ff 100644 --- a/datafusion/physical-expr/src/unicode_expressions.rs +++ b/datafusion/physical-expr/src/unicode_expressions.rs @@ -22,9 +22,7 @@ //! Unicode expressions use arrow::{ - array::{ - ArrayRef, GenericStringArray, Int64Array, PrimitiveArray, StringOffsetSizeTrait, - }, + array::{ArrayRef, GenericStringArray, Int64Array, OffsetSizeTrait, PrimitiveArray}, datatypes::{ArrowNativeType, ArrowPrimitiveType}, }; use datafusion_common::{DataFusionError, Result}; @@ -64,7 +62,7 @@ macro_rules! downcast_arg { /// character_length('josé') = 4 pub fn character_length(args: &[ArrayRef]) -> Result where - T::Native: StringOffsetSizeTrait, + T::Native: OffsetSizeTrait, { let string_array: &GenericStringArray = args[0] .as_any() @@ -89,7 +87,7 @@ where /// Returns first n characters in the string, or when n is negative, returns all but last |n| characters. /// left('abcde', 2) = 'ab' -pub fn left(args: &[ArrayRef]) -> Result { +pub fn left(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let n_array = downcast_arg!(args[1], "n", Int64Array); let result = string_array @@ -122,7 +120,7 @@ pub fn left(args: &[ArrayRef]) -> Result { /// Extends the string to length 'length' by prepending the characters fill (a space by default). If the string is already longer than length then it is truncated (on the right). /// lpad('hi', 5, 'xy') = 'xyxhi' -pub fn lpad(args: &[ArrayRef]) -> Result { +pub fn lpad(args: &[ArrayRef]) -> Result { match args.len() { 2 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -211,7 +209,7 @@ pub fn lpad(args: &[ArrayRef]) -> Result { /// Reverses the order of the characters in the string. /// reverse('abcde') = 'edcba' -pub fn reverse(args: &[ArrayRef]) -> Result { +pub fn reverse(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let result = string_array @@ -226,7 +224,7 @@ pub fn reverse(args: &[ArrayRef]) -> Result /// Returns last n characters in the string, or when n is negative, returns all but first |n| characters. /// right('abcde', 2) = 'de' -pub fn right(args: &[ArrayRef]) -> Result { +pub fn right(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let n_array = downcast_arg!(args[1], "n", Int64Array); @@ -274,7 +272,7 @@ pub fn right(args: &[ArrayRef]) -> Result { /// Extends the string to length 'length' by appending the characters fill (a space by default). If the string is already longer than length then it is truncated. /// rpad('hi', 5, 'xy') = 'hixyx' -pub fn rpad(args: &[ArrayRef]) -> Result { +pub fn rpad(args: &[ArrayRef]) -> Result { match args.len() { 2 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -353,7 +351,7 @@ pub fn rpad(args: &[ArrayRef]) -> Result { /// strpos('high', 'ig') = 2 pub fn strpos(args: &[ArrayRef]) -> Result where - T::Native: StringOffsetSizeTrait, + T::Native: OffsetSizeTrait, { let string_array: &GenericStringArray = args[0] .as_any() @@ -410,7 +408,7 @@ where /// Extracts the substring of string starting at the start'th character, and extending for count characters if that is specified. (Same as substring(string from start for count).) /// substr('alphabet', 3) = 'phabet' /// substr('alphabet', 3, 2) = 'ph' -pub fn substr(args: &[ArrayRef]) -> Result { +pub fn substr(args: &[ArrayRef]) -> Result { match args.len() { 2 => { let string_array = downcast_string_arg!(args[0], "string", T); @@ -497,7 +495,7 @@ pub fn substr(args: &[ArrayRef]) -> Result { /// Replaces each character in string that matches a character in the from set with the corresponding character in the to set. If from is longer than to, occurrences of the extra characters in from are deleted. /// translate('12345', '143', 'ax') = 'a2x5' -pub fn translate(args: &[ArrayRef]) -> Result { +pub fn translate(args: &[ArrayRef]) -> Result { let string_array = downcast_string_arg!(args[0], "string", T); let from_array = downcast_string_arg!(args[1], "from", T); let to_array = downcast_string_arg!(args[2], "to", T); From 859d822345681924c7cd8ba4b8c3848d1452c9f5 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:14:41 -0400 Subject: [PATCH 4/7] Update for new API --- ballista/rust/core/src/client.rs | 11 +++++++++-- datafusion-examples/examples/flight_client.rs | 3 ++- datafusion/core/src/from_slice.rs | 10 +++++----- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index 54418884d3129..b80a6dbc01c4d 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -17,6 +17,7 @@ //! Client API for sending requests to executors. +use std::collections::HashMap; use std::sync::Arc; use std::{ @@ -31,6 +32,7 @@ use crate::serde::scheduler::Action; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::Ticket; use arrow_flight::{flight_service_client::FlightServiceClient, FlightData}; +use datafusion::arrow::array::ArrayRef; use datafusion::arrow::{ datatypes::{Schema, SchemaRef}, error::{ArrowError, Result as ArrowResult}, @@ -131,11 +133,16 @@ impl BallistaClient { struct FlightDataStream { stream: Streaming, schema: SchemaRef, + dictionaries_by_id: HashMap, } impl FlightDataStream { pub fn new(stream: Streaming, schema: SchemaRef) -> Self { - Self { stream, schema } + Self { + stream, + schema, + dictionaries_by_id: HashMap::new(), + } } } @@ -154,7 +161,7 @@ impl Stream for FlightDataStream { flight_data_to_arrow_batch( &flight_data_chunk, self.schema.clone(), - &[], + &mut self.dictionaries_by_id, ) }); Some(converted_chunk) diff --git a/datafusion-examples/examples/flight_client.rs b/datafusion-examples/examples/flight_client.rs index e20397cb96d79..858020f68d4b0 100644 --- a/datafusion-examples/examples/flight_client.rs +++ b/datafusion-examples/examples/flight_client.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::convert::TryFrom; use std::sync::Arc; @@ -62,7 +63,7 @@ async fn main() -> Result<(), Box> { // all the remaining stream messages should be dictionary and record batches let mut results = vec![]; - let dictionaries_by_field = vec![None; schema.fields().len()]; + let mut dictionaries_by_field = HashMap::new(); while let Some(flight_data) = stream.message().await? { let record_batch = flight_data_to_arrow_batch( &flight_data, diff --git a/datafusion/core/src/from_slice.rs b/datafusion/core/src/from_slice.rs index 42b8671d18b7b..2fedc668ae4ef 100644 --- a/datafusion/core/src/from_slice.rs +++ b/datafusion/core/src/from_slice.rs @@ -20,8 +20,8 @@ //! This file essentially exists to ease the transition onto arrow2 use arrow::array::{ - ArrayData, BinaryOffsetSizeTrait, BooleanArray, GenericBinaryArray, - GenericStringArray, PrimitiveArray, StringOffsetSizeTrait, + ArrayData, BooleanArray, GenericBinaryArray, GenericStringArray, OffsetSizeTrait, + PrimitiveArray, }; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::{ArrowPrimitiveType, DataType}; @@ -50,7 +50,7 @@ where /// default implementation for binary array types, adapted from `From>` impl FromSlice for GenericBinaryArray where - OffsetSize: BinaryOffsetSizeTrait, + OffsetSize: OffsetSizeTrait, S: AsRef<[I]>, I: AsRef<[u8]>, { @@ -69,7 +69,7 @@ where offsets.push(length_so_far); values.extend_from_slice(s); } - let array_data = ArrayData::builder(OffsetSize::DATA_TYPE) + let array_data = ArrayData::builder(Self::get_data_type()) .len(slice.len()) .add_buffer(Buffer::from_slice_ref(&offsets)) .add_buffer(Buffer::from_slice_ref(&values)); @@ -81,7 +81,7 @@ where /// default implementation for utf8 array types, adapted from `From>` impl FromSlice for GenericStringArray where - OffsetSize: StringOffsetSizeTrait, + OffsetSize: OffsetSizeTrait, S: AsRef<[I]>, I: AsRef, { From 194bfeec9e1d525824d6325adf0ec22c23db1c67 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:51:25 -0400 Subject: [PATCH 5/7] clippy --- datafusion-examples/examples/flight_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-examples/examples/flight_client.rs b/datafusion-examples/examples/flight_client.rs index 858020f68d4b0..9fea1ab7db7e6 100644 --- a/datafusion-examples/examples/flight_client.rs +++ b/datafusion-examples/examples/flight_client.rs @@ -63,7 +63,7 @@ async fn main() -> Result<(), Box> { // all the remaining stream messages should be dictionary and record batches let mut results = vec![]; - let mut dictionaries_by_field = HashMap::new(); + let dictionaries_by_field = HashMap::new(); while let Some(flight_data) = stream.message().await? { let record_batch = flight_data_to_arrow_batch( &flight_data, From c7d7d499c78d2840bf0141c20544ebe33e73d72d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:53:34 -0400 Subject: [PATCH 6/7] moar clippy --- ballista/rust/core/src/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs index b80a6dbc01c4d..a5c4a062be8b7 100644 --- a/ballista/rust/core/src/client.rs +++ b/ballista/rust/core/src/client.rs @@ -161,7 +161,7 @@ impl Stream for FlightDataStream { flight_data_to_arrow_batch( &flight_data_chunk, self.schema.clone(), - &mut self.dictionaries_by_id, + &self.dictionaries_by_id, ) }); Some(converted_chunk) From 3545e4a3d0b6acb3a704b3cbb28580c43a10469a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Fri, 13 May 2022 13:56:27 -0400 Subject: [PATCH 7/7] Update to after https://github.com/apache/arrow-rs/pull/1682 --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fe55edef493ac..014eeced27462 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,6 @@ codegen-units = 1 lto = true [patch.crates-io] -arrow = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } -parquet = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } -arrow-flight = { git = "https://github.com/apache/arrow-rs.git", tag="14.0.0" } +arrow = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" } +parquet = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" } +arrow-flight = { git = "https://github.com/apache/arrow-rs.git", rev="5b154ea40314dc2f09babbb363bf7f1fe439d4eb" }