diff --git a/rust/arrow/src/util/display.rs b/rust/arrow/src/util/display.rs index 1a873f1b083..7d4b10e3402 100644 --- a/rust/arrow/src/util/display.rs +++ b/rust/arrow/src/util/display.rs @@ -44,6 +44,27 @@ macro_rules! make_string { }}; } +// It's not possible to do array.value($row).to_string() for &[u8], let's format it as hex +macro_rules! make_string_hex { + ($array_type:ty, $column: ident, $row: ident) => {{ + let array = $column.as_any().downcast_ref::<$array_type>().unwrap(); + + let s = if array.is_null($row) { + "".to_string() + } else { + let mut tmp = "".to_string(); + + for character in array.value($row) { + tmp += &format!("{:02x}", character); + } + + tmp + }; + + Ok(s) + }}; +} + macro_rules! make_string_from_list { ($column: ident, $row: ident) => {{ let list = $column @@ -67,6 +88,9 @@ macro_rules! make_string_from_list { pub fn array_value_to_string(column: &array::ArrayRef, row: usize) -> Result { match column.data_type() { DataType::Utf8 => make_string!(array::StringArray, column, row), + DataType::LargeUtf8 => make_string!(array::LargeStringArray, column, row), + DataType::Binary => make_string_hex!(array::BinaryArray, column, row), + DataType::LargeBinary => make_string_hex!(array::LargeBinaryArray, column, row), DataType::Boolean => make_string!(array::BooleanArray, column, row), DataType::Int8 => make_string!(array::Int8Array, column, row), DataType::Int16 => make_string!(array::Int16Array, column, row), diff --git a/rust/datafusion/Cargo.toml b/rust/datafusion/Cargo.toml index d1d055fcef9..4f5f2dcf3ef 100644 --- a/rust/datafusion/Cargo.toml +++ b/rust/datafusion/Cargo.toml @@ -61,6 +61,8 @@ futures = "0.3" pin-project-lite= "^0.2.0" tokio = { version = "0.2", features = ["macros", "blocking", "rt-core", "rt-threaded", "sync"] } log = "^0.4" +md-5 = "^0.9.1" +sha2 = "^0.9.1" [dev-dependencies] rand = "0.8" diff --git a/rust/datafusion/src/logical_plan/expr.rs b/rust/datafusion/src/logical_plan/expr.rs index f8e364a01f6..59d6add3d71 100644 --- a/rust/datafusion/src/logical_plan/expr.rs +++ b/rust/datafusion/src/logical_plan/expr.rs @@ -690,6 +690,11 @@ unary_scalar_expr!(Trim, trim); unary_scalar_expr!(Ltrim, ltrim); unary_scalar_expr!(Rtrim, rtrim); unary_scalar_expr!(Upper, upper); +unary_scalar_expr!(MD5, md5); +unary_scalar_expr!(SHA224, sha224); +unary_scalar_expr!(SHA256, sha256); +unary_scalar_expr!(SHA384, sha384); +unary_scalar_expr!(SHA512, sha512); /// returns the length of a string in bytes pub fn length(e: Expr) -> Expr { diff --git a/rust/datafusion/src/logical_plan/mod.rs b/rust/datafusion/src/logical_plan/mod.rs index 24c493bda8f..3d6d5817d21 100644 --- a/rust/datafusion/src/logical_plan/mod.rs +++ b/rust/datafusion/src/logical_plan/mod.rs @@ -36,8 +36,9 @@ pub use display::display_schema; pub use expr::{ abs, acos, and, array, asin, atan, avg, binary_expr, case, ceil, col, concat, cos, count, count_distinct, create_udaf, create_udf, exp, exprlist_to_fields, floor, - in_list, length, lit, ln, log10, log2, lower, ltrim, max, min, or, round, rtrim, - signum, sin, sqrt, sum, tan, trim, trunc, upper, when, Expr, Literal, + in_list, length, lit, ln, log10, log2, lower, ltrim, max, md5, min, or, round, rtrim, + sha224, sha256, sha384, sha512, signum, sin, sqrt, sum, tan, trim, trunc, upper, + when, Expr, Literal, }; pub use extension::UserDefinedLogicalNode; pub use operators::Operator; diff --git a/rust/datafusion/src/physical_plan/crypto_expressions.rs b/rust/datafusion/src/physical_plan/crypto_expressions.rs new file mode 100644 index 00000000000..6a0940d4503 --- /dev/null +++ b/rust/datafusion/src/physical_plan/crypto_expressions.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Crypto expressions + +use md5::Md5; +use sha2::{ + digest::Output as SHA2DigestOutput, Digest as SHA2Digest, Sha224, Sha256, Sha384, + Sha512, +}; + +use crate::error::{DataFusionError, Result}; +use arrow::array::{ + ArrayRef, GenericBinaryArray, GenericStringArray, StringOffsetSizeTrait, +}; + +fn md5_process(input: &str) -> String { + let mut digest = Md5::default(); + digest.update(&input); + + let mut result = String::new(); + + for byte in &digest.finalize() { + result.push_str(&format!("{:02x}", byte)); + } + + result +} + +// It's not possible to return &[u8], because trait in trait without short lifetime +fn sha_process(input: &str) -> SHA2DigestOutput { + let mut digest = D::default(); + digest.update(&input); + + digest.finalize() +} + +macro_rules! crypto_unary_string_function { + ($NAME:ident, $FUNC:expr) => { + /// crypto function that accepts Utf8 or LargeUtf8 and returns Utf8 string + pub fn $NAME( + args: &[ArrayRef], + ) -> Result> { + if args.len() != 1 { + return Err(DataFusionError::Internal(format!( + "{:?} args were supplied but {} takes exactly one argument", + args.len(), + String::from(stringify!($NAME)), + ))); + } + + let array = args[0] + .as_any() + .downcast_ref::>() + .unwrap(); + + // first map is the iterator, second is for the `Option<_>` + Ok(array.iter().map(|x| x.map(|x| $FUNC(x))).collect()) + } + }; +} + +macro_rules! crypto_unary_binary_function { + ($NAME:ident, $FUNC:expr) => { + /// crypto function that accepts Utf8 or LargeUtf8 and returns Binary + pub fn $NAME( + args: &[ArrayRef], + ) -> Result> { + if args.len() != 1 { + return Err(DataFusionError::Internal(format!( + "{:?} args were supplied but {} takes exactly one argument", + args.len(), + String::from(stringify!($NAME)), + ))); + } + + let array = args[0] + .as_any() + .downcast_ref::>() + .unwrap(); + + // first map is the iterator, second is for the `Option<_>` + Ok(array.iter().map(|x| x.map(|x| $FUNC(x))).collect()) + } + }; +} + +crypto_unary_string_function!(md5, md5_process); +crypto_unary_binary_function!(sha224, sha_process::); +crypto_unary_binary_function!(sha256, sha_process::); +crypto_unary_binary_function!(sha384, sha_process::); +crypto_unary_binary_function!(sha512, sha_process::); diff --git a/rust/datafusion/src/physical_plan/functions.rs b/rust/datafusion/src/physical_plan/functions.rs index ff88ec7173d..cd92ac178c9 100644 --- a/rust/datafusion/src/physical_plan/functions.rs +++ b/rust/datafusion/src/physical_plan/functions.rs @@ -35,6 +35,7 @@ use super::{ }; use crate::error::{DataFusionError, Result}; use crate::physical_plan::array_expressions; +use crate::physical_plan::crypto_expressions; use crate::physical_plan::datetime_expressions; use crate::physical_plan::expressions::{nullif_func, SUPPORTED_NULLIF_TYPES}; use crate::physical_plan::math_expressions; @@ -136,6 +137,16 @@ pub enum BuiltinScalarFunction { NullIf, /// Date truncate DateTrunc, + /// MD5 + MD5, + /// SHA224 + SHA224, + /// SHA256, + SHA256, + /// SHA384 + SHA384, + /// SHA512, + SHA512, } impl fmt::Display for BuiltinScalarFunction { @@ -179,6 +190,11 @@ impl FromStr for BuiltinScalarFunction { "date_trunc" => BuiltinScalarFunction::DateTrunc, "array" => BuiltinScalarFunction::Array, "nullif" => BuiltinScalarFunction::NullIf, + "md5" => BuiltinScalarFunction::MD5, + "sha224" => BuiltinScalarFunction::SHA224, + "sha256" => BuiltinScalarFunction::SHA256, + "sha384" => BuiltinScalarFunction::SHA384, + "sha512" => BuiltinScalarFunction::SHA512, _ => { return Err(DataFusionError::Plan(format!( "There is no built-in function named {}", @@ -288,6 +304,56 @@ pub fn return_type( let coerced_types = data_types(arg_types, &signature(fun)); coerced_types.map(|typs| typs[0].clone()) } + BuiltinScalarFunction::MD5 => Ok(match arg_types[0] { + DataType::LargeUtf8 => DataType::LargeUtf8, + DataType::Utf8 => DataType::Utf8, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The md5 function can only accept strings.".to_string(), + )); + } + }), + BuiltinScalarFunction::SHA224 => Ok(match arg_types[0] { + DataType::LargeUtf8 => DataType::Binary, + DataType::Utf8 => DataType::Binary, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The sha224 function can only accept strings.".to_string(), + )); + } + }), + BuiltinScalarFunction::SHA256 => Ok(match arg_types[0] { + DataType::LargeUtf8 => DataType::Binary, + DataType::Utf8 => DataType::Binary, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The sha256 function can only accept strings.".to_string(), + )); + } + }), + BuiltinScalarFunction::SHA384 => Ok(match arg_types[0] { + DataType::LargeUtf8 => DataType::Binary, + DataType::Utf8 => DataType::Binary, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The sha384 function can only accept strings.".to_string(), + )); + } + }), + BuiltinScalarFunction::SHA512 => Ok(match arg_types[0] { + DataType::LargeUtf8 => DataType::Binary, + DataType::Utf8 => DataType::Binary, + _ => { + // this error is internal as `data_types` should have captured this. + return Err(DataFusionError::Internal( + "The sha512 function can only accept strings.".to_string(), + )); + } + }), _ => Ok(DataType::Float64), } } @@ -318,6 +384,46 @@ pub fn create_physical_expr( BuiltinScalarFunction::Abs => math_expressions::abs, BuiltinScalarFunction::Signum => math_expressions::signum, BuiltinScalarFunction::NullIf => nullif_func, + BuiltinScalarFunction::MD5 => |args| match args[0].data_type() { + DataType::Utf8 => Ok(Arc::new(crypto_expressions::md5::(args)?)), + DataType::LargeUtf8 => Ok(Arc::new(crypto_expressions::md5::(args)?)), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function md5", + other, + ))), + }, + BuiltinScalarFunction::SHA224 => |args| match args[0].data_type() { + DataType::Utf8 => Ok(Arc::new(crypto_expressions::sha224::(args)?)), + DataType::LargeUtf8 => Ok(Arc::new(crypto_expressions::sha224::(args)?)), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function sha224", + other, + ))), + }, + BuiltinScalarFunction::SHA256 => |args| match args[0].data_type() { + DataType::Utf8 => Ok(Arc::new(crypto_expressions::sha256::(args)?)), + DataType::LargeUtf8 => Ok(Arc::new(crypto_expressions::sha256::(args)?)), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function sha256", + other, + ))), + }, + BuiltinScalarFunction::SHA384 => |args| match args[0].data_type() { + DataType::Utf8 => Ok(Arc::new(crypto_expressions::sha384::(args)?)), + DataType::LargeUtf8 => Ok(Arc::new(crypto_expressions::sha384::(args)?)), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function sha384", + other, + ))), + }, + BuiltinScalarFunction::SHA512 => |args| match args[0].data_type() { + DataType::Utf8 => Ok(Arc::new(crypto_expressions::sha512::(args)?)), + DataType::LargeUtf8 => Ok(Arc::new(crypto_expressions::sha512::(args)?)), + other => Err(DataFusionError::Internal(format!( + "Unsupported data type {:?} for function sha512", + other, + ))), + }, BuiltinScalarFunction::Length => |args| Ok(length(args[0].as_ref())?), BuiltinScalarFunction::Concat => { |args| Ok(Arc::new(string_expressions::concatenate(args)?)) @@ -392,23 +498,18 @@ fn signature(fun: &BuiltinScalarFunction) -> Signature { // for now, the list is small, as we do not have many built-in functions. match fun { - BuiltinScalarFunction::Length => { - Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) - } BuiltinScalarFunction::Concat => Signature::Variadic(vec![DataType::Utf8]), - BuiltinScalarFunction::Lower => { - Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) - } - BuiltinScalarFunction::Upper => { - Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) - } - BuiltinScalarFunction::Trim => { - Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) - } - BuiltinScalarFunction::Ltrim => { - Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) - } - BuiltinScalarFunction::Rtrim => { + BuiltinScalarFunction::Upper + | BuiltinScalarFunction::Lower + | BuiltinScalarFunction::Length + | BuiltinScalarFunction::Trim + | BuiltinScalarFunction::Ltrim + | BuiltinScalarFunction::Rtrim + | BuiltinScalarFunction::MD5 + | BuiltinScalarFunction::SHA224 + | BuiltinScalarFunction::SHA256 + | BuiltinScalarFunction::SHA384 + | BuiltinScalarFunction::SHA512 => { Signature::Uniform(1, vec![DataType::Utf8, DataType::LargeUtf8]) } BuiltinScalarFunction::ToTimestamp => Signature::Uniform(1, vec![DataType::Utf8]), diff --git a/rust/datafusion/src/physical_plan/mod.rs b/rust/datafusion/src/physical_plan/mod.rs index f2b984bb306..ebcc35ff53a 100644 --- a/rust/datafusion/src/physical_plan/mod.rs +++ b/rust/datafusion/src/physical_plan/mod.rs @@ -270,6 +270,7 @@ pub mod aggregates; pub mod array_expressions; pub mod coalesce_batches; pub mod common; +pub mod crypto_expressions; pub mod csv; pub mod datetime_expressions; pub mod distinct_expressions; diff --git a/rust/datafusion/src/prelude.rs b/rust/datafusion/src/prelude.rs index 1879bd5cd21..4575de19c66 100644 --- a/rust/datafusion/src/prelude.rs +++ b/rust/datafusion/src/prelude.rs @@ -29,6 +29,7 @@ pub use crate::dataframe::DataFrame; pub use crate::execution::context::{ExecutionConfig, ExecutionContext}; pub use crate::logical_plan::{ array, avg, col, concat, count, create_udf, in_list, length, lit, lower, ltrim, max, - min, rtrim, sum, trim, upper, JoinType, Partitioning, + md5, min, rtrim, sha224, sha256, sha384, sha512, sum, trim, upper, JoinType, + Partitioning, }; pub use crate::physical_plan::csv::CsvReadOptions; diff --git a/rust/datafusion/tests/sql.rs b/rust/datafusion/tests/sql.rs index 2904b748a5d..2cfe40538c4 100644 --- a/rust/datafusion/tests/sql.rs +++ b/rust/datafusion/tests/sql.rs @@ -1854,6 +1854,43 @@ async fn string_expressions() -> Result<()> { Ok(()) } +#[tokio::test] +async fn crypto_expressions() -> Result<()> { + let mut ctx = ExecutionContext::new(); + let sql = "SELECT + md5('tom') AS md5_tom, + md5('') AS md5_empty_str, + md5(null) AS md5_null, + sha224('tom') AS sha224_tom, + sha224('') AS sha224_empty_str, + sha224(null) AS sha224_null, + sha256('tom') AS sha256_tom, + sha256('') AS sha256_empty_str, + sha384('tom') AS sha348_tom, + sha384('') AS sha384_empty_str, + sha512('tom') AS sha512_tom, + sha512('') AS sha512_empty_str + "; + let actual = execute(&mut ctx, sql).await; + + let expected = vec![vec![ + "34b7da764b21d298ef307d04d8152dc5", + "d41d8cd98f00b204e9800998ecf8427e", + "NULL", + "0bf6cb62649c42a9ae3876ab6f6d92ad36cb5414e495f8873292be4d", + "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f", + "NULL", + "e1608f75c5d7813f3d4031cb30bfb786507d98137538ff8e128a6ff74e84e643", + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + "096f5b68aa77848e4fdf5c1c0b350de2dbfad60ffd7c25d9ea07c6c19b8a4d55a9187eb117c557883f58c16dfac3e343", + "38b060a751ac96384cd9327eb1b1e36a21fdb71114be07434c0cc7bf63f6e1da274edebfe76f65fbd51ad2f14898b95b", + "6e1b9b3fe840680e37051f7ad5e959d6f39ad0f8885d855166f55c659469d3c8b78118c44a2a49c72ddb481cd6d8731034e11cc030070ba843a90b3495cb8d3e", + "cf83e1357eefb8bdf1542850d66d8007d620e4050b5715dc83f4a921d36ce9ce47d0d13c5d85f2b0ff8318d2877eec2f63b931bd47417a81a538327af927da3e" + ]]; + assert_eq!(expected, actual); + Ok(()) +} + #[tokio::test] async fn in_list_array() -> Result<()> { let mut ctx = ExecutionContext::new();