From f2846b083f5dda471f734f5805f1aa6cce92fae8 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 1 Jan 2024 08:47:03 -0500 Subject: [PATCH 01/12] Extract encode and decode to `datafusion-functions` crate --- .github/workflows/rust.yml | 6 + Cargo.toml | 3 +- datafusion-cli/Cargo.lock | 110 +++++++++------- datafusion/core/Cargo.toml | 3 +- datafusion/core/src/dataframe/mod.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 14 ++- datafusion/core/src/lib.rs | 5 + datafusion/core/src/prelude.rs | 1 + .../tests/dataframe/dataframe_functions.rs | 59 ++++++++- datafusion/execution/src/registry.rs | 62 ++++++++- datafusion/expr/src/built_in_function.rs | 52 -------- datafusion/expr/src/expr_fn.rs | 34 ----- datafusion/functions/Cargo.toml | 51 ++++++++ datafusion/functions/README.md | 26 ++++ .../src/encoding/inner.rs} | 113 ++++++++++++++++- datafusion/functions/src/encoding/mod.rs | 26 ++++ datafusion/functions/src/lib.rs | 89 +++++++++++++ datafusion/functions/src/macros.rs | 118 ++++++++++++++++++ datafusion/physical-expr/src/functions.rs | 26 ---- datafusion/physical-expr/src/lib.rs | 2 - datafusion/proto/proto/datafusion.proto | 2 - datafusion/proto/src/generated/pbjson.rs | 6 - datafusion/proto/src/generated/prost.rs | 6 - .../proto/src/logical_plan/from_proto.rs | 12 +- datafusion/proto/src/logical_plan/to_proto.rs | 2 - .../tests/cases/roundtrip_logical_plan.rs | 33 ++++- 26 files changed, 660 insertions(+), 203 deletions(-) create mode 100644 datafusion/functions/Cargo.toml create mode 100644 datafusion/functions/README.md rename datafusion/{physical-expr/src/encoding_expressions.rs => functions/src/encoding/inner.rs} (82%) create mode 100644 datafusion/functions/src/encoding/mod.rs create mode 100644 datafusion/functions/src/lib.rs create mode 100644 datafusion/functions/src/macros.rs diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a541091e3a2b..81a916067104 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -76,6 +76,12 @@ jobs: - name: Check workspace with all features run: cargo check --workspace --benches --features avro,json + + # Ensure that the datafusion crate can be built with only a subset of the function + # packages enabled. + - name: Check function packages (encoding_expressions) + run: cargo check --no-default-features --features=encoding_expressions -p datafusion + - name: Check Cargo.lock for datafusion-cli run: | # If this test fails, try running `cargo update` in the `datafusion-cli` directory diff --git a/Cargo.toml b/Cargo.toml index a87923b6a1a0..159e7157af31 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] exclude = ["datafusion-cli"] -members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks", +members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/functions", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/physical-plan", "datafusion/proto", "datafusion/proto/gen", "datafusion/sql", "datafusion/sqllogictest", "datafusion/substrait", "datafusion/wasmtest", "datafusion-examples", "docs", "test-utils", "benchmarks", ] resolver = "2" @@ -49,6 +49,7 @@ datafusion = { path = "datafusion/core", version = "34.0.0" } datafusion-common = { path = "datafusion/common", version = "34.0.0" } datafusion-execution = { path = "datafusion/execution", version = "34.0.0" } datafusion-expr = { path = "datafusion/expr", version = "34.0.0" } +datafusion-functions = { path = "datafusion/functions", version = "34.0.0" } datafusion-optimizer = { path = "datafusion/optimizer", version = "34.0.0" } datafusion-physical-expr = { path = "datafusion/physical-expr", version = "34.0.0" } datafusion-physical-plan = { path = "datafusion/physical-plan", version = "34.0.0" } diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index e85e8b1a9edb..b481c2d7dced 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -25,9 +25,9 @@ checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" [[package]] name = "ahash" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91429305e9f0a25f6205c5b8e0d2db09e0708a7a6df0f42212bb56c32c8ac97a" +checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" dependencies = [ "cfg-if", "const-random", @@ -379,13 +379,13 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.75" +version = "0.1.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdf6721fb0140e4f897002dd086c06f6c27775df19cfe1fccb21181a48fd2c98" +checksum = "531b97fb4cd3dfdce92c35dedbfdc1f0b9d8091c8ca943d6dae340ef5012d514" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -792,9 +792,9 @@ dependencies = [ [[package]] name = "bstr" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542f33a8835a0884b006a0c3df3dadd99c0c3f296ed26c2fdc8028e01ad6230c" +checksum = "c48f0051a4b4c5e0b6d365cd04af53aeaa209e3cc15ec2cdb69e73cc87fbd0dc" dependencies = [ "memchr", "regex-automata", @@ -881,9 +881,9 @@ dependencies = [ [[package]] name = "chrono-tz" -version = "0.8.4" +version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e23185c0e21df6ed832a12e2bda87c7d1def6842881fb634a8511ced741b0d76" +checksum = "91d7b79e99bfaa0d47da0687c43aa3b7381938a62ad3a6498599039321f660b7" dependencies = [ "chrono", "chrono-tz-build", @@ -1075,7 +1075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30d2b3721e861707777e3195b0158f950ae6dc4a27e4d02ff9f67e3eb3de199e" dependencies = [ "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -1116,6 +1116,7 @@ dependencies = [ "datafusion-common", "datafusion-execution", "datafusion-expr", + "datafusion-functions", "datafusion-optimizer", "datafusion-physical-expr", "datafusion-physical-plan", @@ -1225,6 +1226,19 @@ dependencies = [ "strum_macros", ] +[[package]] +name = "datafusion-functions" +version = "34.0.0" +dependencies = [ + "arrow", + "base64", + "datafusion-common", + "datafusion-execution", + "datafusion-expr", + "hex", + "log", +] + [[package]] name = "datafusion-optimizer" version = "34.0.0" @@ -1316,9 +1330,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eb30d70a07a3b04884d2677f06bec33509dc67ca60d92949e5535352d3191dc" +checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" dependencies = [ "powerfmt", ] @@ -1579,7 +1593,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -1837,9 +1851,9 @@ dependencies = [ [[package]] name = "iana-time-zone" -version = "0.1.58" +version = "0.1.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8326b86b6cff230b97d0d312a6c40a60726df3332e721f72a1b035f451663b20" +checksum = "b6a67363e2aa4443928ce15e57ebae94fd8949958fd1223c4cfc0cd473ad7539" dependencies = [ "android_system_properties", "core-foundation-sys", @@ -2132,9 +2146,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.4" +version = "2.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" +checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "mimalloc" @@ -2499,7 +2513,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -2589,9 +2603,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.71" +version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75cb1540fadbd5b8fbccc4dddad2734eba435053f725621c070711a14bb5f4b8" +checksum = "2dd5e8a1f1029c43224ad5898e50140c2aebb1705f19e67c918ebf5b9e797fe1" dependencies = [ "unicode-ident", ] @@ -2614,9 +2628,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.33" +version = "1.0.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" +checksum = "22a37c9326af5ed140c86a46655b5278de879853be5573c01df185b6f49a580a" dependencies = [ "proc-macro2", ] @@ -2948,11 +2962,11 @@ dependencies = [ [[package]] name = "schannel" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c3733bf4cf7ea0880754e19cb5a462007c4a8c1914bff372ccc95b464f1df88" +checksum = "fbc91545643bcf3a0bbb6569265615222618bdf33ce4ffbbd13c4bbd4c093534" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3023,14 +3037,14 @@ checksum = "43576ca501357b9b071ac53cdc7da8ef0cbd9493d8df094cd821777ea6e894d3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] name = "serde_json" -version = "1.0.108" +version = "1.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" +checksum = "cb0652c533506ad7a2e353cce269330d6afd8bdfb6d75e0ace5b35aacbd7b9e9" dependencies = [ "itoa", "ryu", @@ -3189,7 +3203,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -3211,9 +3225,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.43" +version = "2.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee659fb5f3d355364e1f3e5bc10fb82068efbf824a1e9d1c9504244a6469ad53" +checksum = "92d27c2c202598d05175a6dd3af46824b7f747f8d8e9b14c623f19fa5069735d" dependencies = [ "proc-macro2", "quote", @@ -3243,15 +3257,15 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.8.1" +version = "3.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ef1adac450ad7f4b3c28589471ade84f25f731a7a0fe30d71dfa9f60fd808e5" +checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", "fastrand 2.0.1", "redox_syscall", "rustix", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -3277,22 +3291,22 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.52" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" +checksum = "b2cd5904763bad08ad5513ddbb12cf2ae273ca53fa9f68e843e236ec6dfccc09" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.52" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" +checksum = "3dcf4a824cce0aeacd6f38ae6f24234c8e80d68632338ebaa1443b5df9e29e19" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -3384,7 +3398,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -3481,7 +3495,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -3526,7 +3540,7 @@ checksum = "f03ca4cb38206e2bef0700092660bb74d696f808514dae47fa1467cbfe26e96e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] @@ -3680,7 +3694,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", "wasm-bindgen-shared", ] @@ -3714,7 +3728,7 @@ checksum = "f0eb82fcb7930ae6219a7ecfd55b217f5f0893484b7a13022ebb2b2bf20b5283" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3797,11 +3811,11 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-core" -version = "0.51.1" +version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -3978,7 +3992,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.43", + "syn 2.0.44", ] [[package]] diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 9de6a7f7d6a0..ffb0919b7926 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -40,7 +40,7 @@ backtrace = ["datafusion-common/backtrace"] compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions", "datafusion-optimizer/crypto_expressions"] default = ["crypto_expressions", "encoding_expressions", "regex_expressions", "unicode_expressions", "compression", "parquet"] -encoding_expressions = ["datafusion-physical-expr/encoding_expressions"] +encoding_expressions = ["datafusion-functions/encoding_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] parquet = ["datafusion-common/parquet", "dep:parquet"] @@ -66,6 +66,7 @@ dashmap = { workspace = true } datafusion-common = { path = "../common", version = "34.0.0", features = ["object_store"], default-features = false } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions = { path = "../functions", version = "34.0.0"} datafusion-optimizer = { path = "../optimizer", version = "34.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "34.0.0", default-features = false } datafusion-physical-plan = { workspace = true } diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 5a8c706e32cd..5405a815215b 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -40,7 +40,6 @@ use crate::physical_plan::{ collect, collect_partitioned, execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; -use crate::prelude::SessionContext; use arrow::array::{Array, ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; @@ -59,6 +58,7 @@ use datafusion_expr::{ TableProviderFilterPushDown, UNNAMED_TABLE, }; +use crate::prelude::SessionContext; use async_trait::async_trait; /// Contains options that control how data is diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index d6b7f046f3e3..269b04b1ef53 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -1338,7 +1338,7 @@ impl SessionState { ); } - SessionState { + let mut new_self = SessionState { session_id, analyzer: Analyzer::new(), optimizer: Optimizer::new(), @@ -1354,7 +1354,13 @@ impl SessionState { execution_props: ExecutionProps::new(), runtime_env: runtime, table_factories, - } + }; + + // register built in functions + datafusion_functions::register_all(&mut new_self) + .expect("can not register built in functions"); + + new_self } /// Returns new [`SessionState`] using the provided /// [`SessionConfig`] and [`RuntimeEnv`]. @@ -1962,6 +1968,10 @@ impl FunctionRegistry for SessionState { plan_datafusion_err!("There is no UDWF named \"{name}\" in the registry") }) } + + fn register_udf(&mut self, udf: Arc) -> Result>> { + Ok(self.scalar_functions.insert(udf.name().into(), udf)) + } } impl OptimizerConfig for SessionState { diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index b3ebbc6e3637..44539a4357e2 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -491,6 +491,11 @@ pub mod sql { pub use datafusion_sql::*; } +/// re-export of [`datafusion_functions`] crate +pub mod functions { + pub use datafusion_functions::*; +} + #[cfg(test)] pub mod test; pub mod test_util; diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs index 5cd8b3870f81..69c33355402b 100644 --- a/datafusion/core/src/prelude.rs +++ b/datafusion/core/src/prelude.rs @@ -38,6 +38,7 @@ pub use datafusion_expr::{ logical_plan::{JoinType, Partitioning}, Expr, }; +pub use datafusion_functions::expr_fn::*; pub use std::ops::Not; pub use std::ops::{Add, Div, Mul, Neg, Rem, Sub}; diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index fe56fc22ea8c..864ce5f8b3ac 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -20,6 +20,7 @@ use arrow::{ array::{Int32Array, StringArray}, record_batch::RecordBatch, }; +use arrow_schema::SchemaRef; use std::sync::Arc; use datafusion::dataframe::DataFrame; @@ -31,14 +32,19 @@ use datafusion::prelude::*; use datafusion::execution::context::SessionContext; use datafusion::assert_batches_eq; +use datafusion_common::DFSchema; use datafusion_expr::expr::Alias; -use datafusion_expr::{approx_median, cast}; +use datafusion_expr::{approx_median, cast, ExprSchemable}; -async fn create_test_table() -> Result { - let schema = Arc::new(Schema::new(vec![ +fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), Field::new("b", DataType::Int32, false), - ])); + ])) +} + +async fn create_test_table() -> Result { + let schema = test_schema(); // define data. let batch = RecordBatch::try_new( @@ -750,3 +756,48 @@ async fn test_fn_upper() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn test_fn_encode() -> Result<()> { + let expr = encode(vec![col("a"), lit("hex")]); + + let expected = [ + "+----------------------------+", + "| encode(test.a,Utf8(\"hex\")) |", + "+----------------------------+", + "| 616263444546 |", + "| 616263313233 |", + "| 434241646566 |", + "| 313233416263446566 |", + "+----------------------------+", + ]; + assert_fn_batches!(expr, expected); + + Ok(()) +} + +#[tokio::test] +async fn test_fn_decode() -> Result<()> { + // Note that the decode function returns binary, and the default display of + // binary is "hexadecimal" and therefore the output looks like decode did + // nothing. So compare to a constant. + let df_schema = DFSchema::try_from(test_schema().as_ref().clone())?; + let expr = decode(vec![encode(vec![col("a"), lit("hex")]), lit("hex")]) + // need to cast to utf8 otherwise the default display of binary array is hex + // so it looks like nothing is done + .cast_to(&DataType::Utf8, &df_schema)?; + + let expected = [ + "+------------------------------------------------+", + "| decode(encode(test.a,Utf8(\"hex\")),Utf8(\"hex\")) |", + "+------------------------------------------------+", + "| abcDEF |", + "| abc123 |", + "| CBAdef |", + "| 123AbcDef |", + "+------------------------------------------------+", + ]; + assert_fn_batches!(expr, expected); + + Ok(()) +} diff --git a/datafusion/execution/src/registry.rs b/datafusion/execution/src/registry.rs index 9ba487e715b3..8b538beed18c 100644 --- a/datafusion/execution/src/registry.rs +++ b/datafusion/execution/src/registry.rs @@ -17,8 +17,9 @@ //! FunctionRegistry trait -use datafusion_common::Result; +use datafusion_common::{not_impl_err, plan_datafusion_err, DataFusionError, Result}; use datafusion_expr::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; +use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; /// A registry knows how to build logical expressions out of user-defined function' names @@ -34,6 +35,17 @@ pub trait FunctionRegistry { /// Returns a reference to the udwf named `name`. fn udwf(&self, name: &str) -> Result>; + + /// Registers a new [`ScalarUDF`], returning any previously registered + /// implementation. + /// + /// Returns an error (the default) if the function can not be registered, + /// for example if the registry is read only. + fn register_udf(&mut self, _udf: Arc) -> Result>> { + not_impl_err!("Registering ScalarUDF") + } + + // TODO add register_udaf and register_udwf } /// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode]. @@ -53,3 +65,51 @@ pub trait SerializerRegistry: Send + Sync { bytes: &[u8], ) -> Result>; } + +/// A [`FunctionRegistry`] that uses in memory HashMaps +#[derive(Default, Debug)] +pub struct MemoryFunctionRegistry { + /// Scalar Functions + udfs: HashMap>, + /// Aggregate Functions + udafs: HashMap>, + /// Window Functions + udwfs: HashMap>, +} + +impl MemoryFunctionRegistry { + pub fn new() -> Self { + Self::default() + } +} + +impl FunctionRegistry for MemoryFunctionRegistry { + fn udfs(&self) -> HashSet { + self.udfs.keys().cloned().collect() + } + + fn udf(&self, name: &str) -> Result> { + self.udfs + .get(name) + .cloned() + .ok_or_else(|| plan_datafusion_err!("Function {name} not found")) + } + + fn udaf(&self, name: &str) -> Result> { + self.udafs + .get(name) + .cloned() + .ok_or_else(|| plan_datafusion_err!("Aggregate Function {name} not found")) + } + + fn udwf(&self, name: &str) -> Result> { + self.udwfs + .get(name) + .cloned() + .ok_or_else(|| plan_datafusion_err!("Window Function {name} not found")) + } + + fn register_udf(&mut self, udf: Arc) -> Result>> { + Ok(self.udfs.insert(udf.name().to_string(), udf)) + } +} diff --git a/datafusion/expr/src/built_in_function.rs b/datafusion/expr/src/built_in_function.rs index e642dae06e4f..0732f8bd5c66 100644 --- a/datafusion/expr/src/built_in_function.rs +++ b/datafusion/expr/src/built_in_function.rs @@ -69,14 +69,10 @@ pub enum BuiltinScalarFunction { Cos, /// cos Cosh, - /// Decode - Decode, /// degrees Degrees, /// Digest Digest, - /// Encode - Encode, /// exp Exp, /// factorial @@ -373,9 +369,7 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Coalesce => Volatility::Immutable, BuiltinScalarFunction::Cos => Volatility::Immutable, BuiltinScalarFunction::Cosh => Volatility::Immutable, - BuiltinScalarFunction::Decode => Volatility::Immutable, BuiltinScalarFunction::Degrees => Volatility::Immutable, - BuiltinScalarFunction::Encode => Volatility::Immutable, BuiltinScalarFunction::Exp => Volatility::Immutable, BuiltinScalarFunction::Factorial => Volatility::Immutable, BuiltinScalarFunction::Floor => Volatility::Immutable, @@ -758,30 +752,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::Digest => { utf8_or_binary_to_binary_type(&input_expr_types[0], "digest") } - BuiltinScalarFunction::Encode => Ok(match input_expr_types[0] { - Utf8 => Utf8, - LargeUtf8 => LargeUtf8, - Binary => Utf8, - LargeBinary => LargeUtf8, - Null => Null, - _ => { - return plan_err!( - "The encode function can only accept utf8 or binary." - ); - } - }), - BuiltinScalarFunction::Decode => Ok(match input_expr_types[0] { - Utf8 => Binary, - LargeUtf8 => LargeBinary, - Binary => Binary, - LargeBinary => LargeBinary, - Null => Null, - _ => { - return plan_err!( - "The decode function can only accept utf8 or binary." - ); - } - }), BuiltinScalarFunction::SplitPart => { utf8_to_str_type(&input_expr_types[0], "split_part") } @@ -1118,24 +1088,6 @@ impl BuiltinScalarFunction { ], self.volatility(), ), - BuiltinScalarFunction::Encode => Signature::one_of( - vec![ - Exact(vec![Utf8, Utf8]), - Exact(vec![LargeUtf8, Utf8]), - Exact(vec![Binary, Utf8]), - Exact(vec![LargeBinary, Utf8]), - ], - self.volatility(), - ), - BuiltinScalarFunction::Decode => Signature::one_of( - vec![ - Exact(vec![Utf8, Utf8]), - Exact(vec![LargeUtf8, Utf8]), - Exact(vec![Binary, Utf8]), - Exact(vec![LargeBinary, Utf8]), - ], - self.volatility(), - ), BuiltinScalarFunction::DateTrunc => Signature::one_of( vec![ Exact(vec![Utf8, Timestamp(Nanosecond, None)]), @@ -1570,10 +1522,6 @@ impl BuiltinScalarFunction { BuiltinScalarFunction::SHA384 => &["sha384"], BuiltinScalarFunction::SHA512 => &["sha512"], - // encode/decode - BuiltinScalarFunction::Encode => &["encode"], - BuiltinScalarFunction::Decode => &["decode"], - // other functions BuiltinScalarFunction::ArrowTypeof => &["arrow_typeof"], diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index eed41d97ccba..da4aafbea2fd 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -786,8 +786,6 @@ scalar_expr!( "converts the Unicode code point to a UTF8 character" ); scalar_expr!(Digest, digest, input algorithm, "compute the binary hash of `input`, using the `algorithm`"); -scalar_expr!(Encode, encode, input encoding, "encode the `input`, using the `encoding`. encoding can be base64 or hex"); -scalar_expr!(Decode, decode, input encoding, "decode the`input`, using the `encoding`. encoding can be base64 or hex"); scalar_expr!(InitCap, initcap, string, "converts the first letter of each word in `string` in uppercase and the remaining characters in lowercase"); scalar_expr!(Left, left, string n, "returns the first `n` characters in the `string`"); scalar_expr!(Lower, lower, string, "convert the string to lower case"); @@ -1196,8 +1194,6 @@ mod test { test_scalar_expr!(CharacterLength, character_length, string); test_scalar_expr!(Chr, chr, string); test_scalar_expr!(Digest, digest, string, algorithm); - test_scalar_expr!(Encode, encode, string, encoding); - test_scalar_expr!(Decode, decode, string, encoding); test_scalar_expr!(Gcd, gcd, arg_1, arg_2); test_scalar_expr!(Lcm, lcm, arg_1, arg_2); test_scalar_expr!(InitCap, initcap, string); @@ -1310,34 +1306,4 @@ mod test { unreachable!(); } } - - #[test] - fn encode_function_definitions() { - if let Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::BuiltIn(fun), - args, - }) = encode(col("tableA.a"), lit("base64")) - { - let name = BuiltinScalarFunction::Encode; - assert_eq!(name, fun); - assert_eq!(2, args.len()); - } else { - unreachable!(); - } - } - - #[test] - fn decode_function_definitions() { - if let Expr::ScalarFunction(ScalarFunction { - func_def: ScalarFunctionDefinition::BuiltIn(fun), - args, - }) = decode(col("tableA.a"), lit("hex")) - { - let name = BuiltinScalarFunction::Decode; - assert_eq!(name, fun); - assert_eq!(2, args.len()); - } else { - unreachable!(); - } - } } diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml new file mode 100644 index 000000000000..6d4a716e2e8e --- /dev/null +++ b/datafusion/functions/Cargo.toml @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "datafusion-functions" +description = "Function packages for the DataFusion query engine" +keywords = ["datafusion", "logical", "plan", "expressions"] +readme = "README.md" +version = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +license = { workspace = true } +authors = { workspace = true } +rust-version = { workspace = true } + +[features] +# Enable encoding by default so the doctests work. In general don't automatically enable all packages. +default = ["encoding_expressions"] +# enable the encode/decode functions +encoding_expressions = ["base64", "hex"] + + +[lib] +name = "datafusion_functions" +path = "src/lib.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +arrow = { workspace = true } +base64 = { version = "0.21", optional = true } +datafusion-common = { workspace = true } +datafusion-execution = { workspace = true } +datafusion-expr = { workspace = true } +hex = { version = "0.4", optional = true } +log = "0.4.20" diff --git a/datafusion/functions/README.md b/datafusion/functions/README.md new file mode 100644 index 000000000000..214cc8ff0af5 --- /dev/null +++ b/datafusion/functions/README.md @@ -0,0 +1,26 @@ + + +# DataFusion Function Library + +[DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. + +This crate contains several packages of function that can be used with DataFusion. + +[df]: https://crates.io/crates/datafusion diff --git a/datafusion/physical-expr/src/encoding_expressions.rs b/datafusion/functions/src/encoding/inner.rs similarity index 82% rename from datafusion/physical-expr/src/encoding_expressions.rs rename to datafusion/functions/src/encoding/inner.rs index b74310485fb7..5ca15a7e6b01 100644 --- a/datafusion/physical-expr/src/encoding_expressions.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -32,12 +32,123 @@ use datafusion_expr::ColumnarValue; use std::sync::Arc; use std::{fmt, str::FromStr}; +use datafusion_expr::TypeSignature::*; +use datafusion_expr::{ScalarUDFImpl, Signature, Volatility}; +use std::any::Any; + +#[derive(Debug)] +pub(super) struct EncodeFunc { + signature: Signature, +} + +impl EncodeFunc { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::one_of( + vec![ + Exact(vec![Utf8, Utf8]), + Exact(vec![LargeUtf8, Utf8]), + Exact(vec![Binary, Utf8]), + Exact(vec![LargeBinary, Utf8]), + ], + Volatility::Immutable, + ) + } + } +} + +impl ScalarUDFImpl for EncodeFunc { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "encode" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + + Ok(match arg_types[0] { + Utf8 => Utf8, + LargeUtf8 => LargeUtf8, + Binary => Utf8, + LargeBinary => LargeUtf8, + Null => Null, + _ => { + return plan_err!("The encode function can only accept utf8 or binary."); + } + }) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + encode(args) + } +} + +#[derive(Debug)] +pub(super) struct DecodeFunc { + signature: Signature, +} + +impl DecodeFunc { + pub fn new() -> Self { + use DataType::*; + Self { + signature: Signature::one_of( + vec![ + Exact(vec![Utf8, Utf8]), + Exact(vec![LargeUtf8, Utf8]), + Exact(vec![Binary, Utf8]), + Exact(vec![LargeBinary, Utf8]), + ], + Volatility::Immutable, + ) + } + } +} +impl ScalarUDFImpl for DecodeFunc { + fn as_any(&self) -> &dyn Any { + self + } + fn name(&self) -> &str { + "decode" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + use DataType::*; + + Ok(match arg_types[0] { + Utf8 => Binary, + LargeUtf8 => LargeBinary, + Binary => Binary, + LargeBinary => LargeBinary, + Null => Null, + _ => { + return plan_err!("The decode function can only accept utf8 or binary."); + } + }) + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + // Put a feature flag here to make sure this is only compiled when the feature is activated + super::inner::decode(args) + } +} + #[derive(Debug, Copy, Clone)] enum Encoding { Base64, Hex, } - fn encode_process(value: &ColumnarValue, encoding: Encoding) -> Result { match value { ColumnarValue::Array(a) => match a.data_type() { diff --git a/datafusion/functions/src/encoding/mod.rs b/datafusion/functions/src/encoding/mod.rs new file mode 100644 index 000000000000..7b0c6d0ff32f --- /dev/null +++ b/datafusion/functions/src/encoding/mod.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod inner; + +// create `encode` and `decode` functions +make_udf_function!(inner::EncodeFunc, ENCODE, encode); +make_udf_function!(inner::DecodeFunc, DECODE, decode); + +// Export the functions out of this package, both as expr_fn as well as a list of functions +export_functions!(encode, decode); + diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs new file mode 100644 index 000000000000..cac0ac9d092d --- /dev/null +++ b/datafusion/functions/src/lib.rs @@ -0,0 +1,89 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Function packages for DataFusion +//! +//! Each package is a implemented as a separate +//! module, which can be activated by a feature flag. +//! +//! +//! # Available packages +//! * [`encoding`]: feature `encoding_expressions`: Hex and binary `encode` and `decode` functions +//! +//! # Using a package +//! You can register all functions in all packages using the [`register_all`] function. +//! +//! To register a certain package of functions: +//! ``` +//! # fn main() -> datafusion_common::Result<()> { +//! # let mut registry = datafusion_execution::registry::MemoryFunctionRegistry::new(); +//! # use datafusion_execution::FunctionRegistry; +//! // get the encoding functions +//! use datafusion_functions::encoding; +//! for udf in encoding::functions() { +//! registry.register_udf(udf)?; +//! } +//! # Ok(()) +//! # } +//! ``` +//! +//! You can also use the "expr_fn" module to invoke functions in a fluent style: +//! ``` +//! // create an Expr that will invoke the encode function +//! use datafusion_expr::{col, lit}; +//! use datafusion_functions::expr_fn; +//! // encode(my_data, 'hex') +//! let expr = expr_fn::encode(vec![col("my_data"), lit("hex")]); +//! ``` +//! +//! # Implementing a new package +//! +//! To add a new package: +//! +//! 1. Create a new module with the appropriate `ScalarUDF`s +//! +//! 2. Add the new feature flag to `Cargo.toml`, with any optional dependencies +//! +//! 3. export any `expr_fn`s in `expr_fn.rs` +//! +//! 2. Use the `make_package` macro to export the module if the specified feature is enabled +use datafusion_common::Result; +use datafusion_execution::FunctionRegistry; +use log::debug; + +#[macro_use] +mod macros; + +make_package!(encoding, "encoding_expressions"); + +/// Fluent-style API for creating `Expr`s to invoke functions +pub mod expr_fn { + #[cfg(feature = "encoding_expressions")] + pub use super::encoding::expr_fn::*; +} + +/// Registers all enabled packages with a [`FunctionRegistry`] +pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { + encoding::functions().into_iter().try_for_each(|udf| { + let existing_udf = registry.register_udf(udf)?; + if let Some(existing_udf) = existing_udf { + debug!("Overwrite existing UDF: {}", existing_udf.name()); + } + Ok(()) as Result<()> + })?; + Ok(()) +} diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs new file mode 100644 index 000000000000..7ee6282fb2cb --- /dev/null +++ b/datafusion/functions/src/macros.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// macro that exports a list of function names as: +/// 1. individual functions in an `expr_fn` module +/// 2. a single function that returns a list of all functions +/// +/// Equivalent to +/// ```text +/// pub mod expr_fn { +/// use super::*; +/// /// Return encode(arg) +/// pub fn encode(args: Vec) -> Expr { +/// super::encode().call(args) +/// } +/// ... +/// /// Return a list of all functions in this package +/// pub(crate) fn functions() -> Vec> { +/// vec![ +/// encode(), +/// decode() +/// ] +/// } +/// ``` +macro_rules! export_functions { + ($($name:ident),*) => { + pub mod expr_fn { + $( + /// Return $name(arg) + pub fn $name(args: Vec) -> datafusion_expr::Expr { + super::$name().call(args) + } + )* + } + + /// Return a list of all functions in this package + pub fn functions() -> Vec> { + vec![ + $( + $name(), + )* + ] + } + }; +} + +/// Creates a singleton `ScalarUDF` of the `$UDF` function named `$GNAME` and a +/// function named `$NAME` which returns that function named $NAME. +/// +/// This is used to ensure creating the list of `ScalarUDF` only happens once. +macro_rules! make_udf_function { + ($UDF:ty, $GNAME:ident, $NAME:ident) => { + /// Singleton instance of the function + static $GNAME: std::sync::OnceLock> = + std::sync::OnceLock::new(); + + /// Return a [`ScalarUDF`] for [`$UDF`] + fn $NAME() -> std::sync::Arc { + $GNAME + .get_or_init(|| { + std::sync::Arc::new(datafusion_expr::ScalarUDF::new_from_impl( + <$UDF>::new(), + )) + }) + .clone() + } + }; +} + +/// Macro creates the named module if the feature is enabled +/// otherwise creates a stub +/// +/// Which returns: +/// +/// 1. The list of actual function implementation when the relevant +/// feature is activated, +/// +/// 2. A list of stub function when the feature is not activated that produce +/// a runtime error (and explain what feature flag is needed to activate them). +/// +/// The rationale for providing stub functions is to help users to configure datafusion +/// properly (so they get an error telling them why a function is not available) +/// instead of getting a cryptic "no function found" message at runtime. + +macro_rules! make_package { + ($name:ident, $feature:literal) => { + #[cfg(feature = $feature)] + pub mod $name; + + #[cfg(not(feature = $feature))] + /// Stub module when feature is not enabled + pub mod $name { + use datafusion_expr::ScalarUDF; + use log::debug; + use std::sync::Arc; + + /// Returns an empty list of functions when the feature is not enabled + pub fn functions() -> Vec> { + debug!("{} functions disabled", stringify!($name)); + vec![] + } + } + }; +} diff --git a/datafusion/physical-expr/src/functions.rs b/datafusion/physical-expr/src/functions.rs index 53de85843919..8b08e6f92793 100644 --- a/datafusion/physical-expr/src/functions.rs +++ b/datafusion/physical-expr/src/functions.rs @@ -83,26 +83,6 @@ pub fn create_physical_expr( ))) } -#[cfg(feature = "encoding_expressions")] -macro_rules! invoke_if_encoding_expressions_feature_flag { - ($FUNC:ident, $NAME:expr) => {{ - use crate::encoding_expressions; - encoding_expressions::$FUNC - }}; -} - -#[cfg(not(feature = "encoding_expressions"))] -macro_rules! invoke_if_encoding_expressions_feature_flag { - ($FUNC:ident, $NAME:expr) => { - |_: &[ColumnarValue]| -> Result { - internal_err!( - "function {} requires compilation with feature flag: encoding_expressions.", - $NAME - ) - } - }; -} - #[cfg(feature = "crypto_expressions")] macro_rules! invoke_if_crypto_expressions_feature_flag { ($FUNC:ident, $NAME:expr) => {{ @@ -578,12 +558,6 @@ pub fn create_physical_fun( BuiltinScalarFunction::Digest => { Arc::new(invoke_if_crypto_expressions_feature_flag!(digest, "digest")) } - BuiltinScalarFunction::Decode => Arc::new( - invoke_if_encoding_expressions_feature_flag!(decode, "decode"), - ), - BuiltinScalarFunction::Encode => Arc::new( - invoke_if_encoding_expressions_feature_flag!(encode, "encode"), - ), BuiltinScalarFunction::NullIf => Arc::new(nullif_func), BuiltinScalarFunction::OctetLength => Arc::new(|args| match &args[0] { ColumnarValue::Array(v) => Ok(ColumnarValue::Array(length(v.as_ref())?)), diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index fffa8f602d87..208a57486ea3 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -22,8 +22,6 @@ pub mod conditional_expressions; #[cfg(feature = "crypto_expressions")] pub mod crypto_expressions; pub mod datetime_expressions; -#[cfg(feature = "encoding_expressions")] -pub mod encoding_expressions; pub mod equivalence; pub mod execution_props; pub mod expressions; diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index d5f8397aa30c..a9797cac0649 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -638,8 +638,6 @@ enum ScalarFunction { Cardinality = 98; ArrayElement = 99; ArraySlice = 100; - Encode = 101; - Decode = 102; Cot = 103; ArrayHas = 104; ArrayHasAny = 105; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 12e834d75adf..c54e3a46f8e7 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -22303,8 +22303,6 @@ impl serde::Serialize for ScalarFunction { Self::Cardinality => "Cardinality", Self::ArrayElement => "ArrayElement", Self::ArraySlice => "ArraySlice", - Self::Encode => "Encode", - Self::Decode => "Decode", Self::Cot => "Cot", Self::ArrayHas => "ArrayHas", Self::ArrayHasAny => "ArrayHasAny", @@ -22444,8 +22442,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Cardinality", "ArrayElement", "ArraySlice", - "Encode", - "Decode", "Cot", "ArrayHas", "ArrayHasAny", @@ -22614,8 +22610,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction { "Cardinality" => Ok(ScalarFunction::Cardinality), "ArrayElement" => Ok(ScalarFunction::ArrayElement), "ArraySlice" => Ok(ScalarFunction::ArraySlice), - "Encode" => Ok(ScalarFunction::Encode), - "Decode" => Ok(ScalarFunction::Decode), "Cot" => Ok(ScalarFunction::Cot), "ArrayHas" => Ok(ScalarFunction::ArrayHas), "ArrayHasAny" => Ok(ScalarFunction::ArrayHasAny), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 4ee0b70325ca..a0a027e65f95 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -2725,8 +2725,6 @@ pub enum ScalarFunction { Cardinality = 98, ArrayElement = 99, ArraySlice = 100, - Encode = 101, - Decode = 102, Cot = 103, ArrayHas = 104, ArrayHasAny = 105, @@ -2863,8 +2861,6 @@ impl ScalarFunction { ScalarFunction::Cardinality => "Cardinality", ScalarFunction::ArrayElement => "ArrayElement", ScalarFunction::ArraySlice => "ArraySlice", - ScalarFunction::Encode => "Encode", - ScalarFunction::Decode => "Decode", ScalarFunction::Cot => "Cot", ScalarFunction::ArrayHas => "ArrayHas", ScalarFunction::ArrayHasAny => "ArrayHasAny", @@ -2998,8 +2994,6 @@ impl ScalarFunction { "Cardinality" => Some(Self::Cardinality), "ArrayElement" => Some(Self::ArrayElement), "ArraySlice" => Some(Self::ArraySlice), - "Encode" => Some(Self::Encode), - "Decode" => Some(Self::Decode), "Cot" => Some(Self::Cot), "ArrayHas" => Some(Self::ArrayHas), "ArrayHasAny" => Some(Self::ArrayHasAny), diff --git a/datafusion/proto/src/logical_plan/from_proto.rs b/datafusion/proto/src/logical_plan/from_proto.rs index 36c5b44f00b9..e97b0d3bc82e 100644 --- a/datafusion/proto/src/logical_plan/from_proto.rs +++ b/datafusion/proto/src/logical_plan/from_proto.rs @@ -50,7 +50,7 @@ use datafusion_expr::{ array_to_string, arrow_typeof, ascii, asin, asinh, atan, atan2, atanh, bit_length, btrim, cardinality, cbrt, ceil, character_length, chr, coalesce, concat_expr, concat_ws_expr, cos, cosh, cot, current_date, current_time, date_bin, date_part, - date_trunc, decode, degrees, digest, encode, exp, + date_trunc, degrees, digest, exp, expr::{self, InList, Sort, WindowFunction}, factorial, find_in_set, flatten, floor, from_unixtime, gcd, gen_range, isnan, iszero, lcm, left, levenshtein, ln, log, log10, log2, @@ -519,8 +519,6 @@ impl From<&protobuf::ScalarFunction> for BuiltinScalarFunction { ScalarFunction::Sha384 => Self::SHA384, ScalarFunction::Sha512 => Self::SHA512, ScalarFunction::Digest => Self::Digest, - ScalarFunction::Encode => Self::Encode, - ScalarFunction::Decode => Self::Decode, ScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, ScalarFunction::Log2 => Self::Log2, ScalarFunction::Signum => Self::Signum, @@ -1553,14 +1551,6 @@ pub fn parse_expr( ScalarFunction::Sha384 => Ok(sha384(parse_expr(&args[0], registry)?)), ScalarFunction::Sha512 => Ok(sha512(parse_expr(&args[0], registry)?)), ScalarFunction::Md5 => Ok(md5(parse_expr(&args[0], registry)?)), - ScalarFunction::Encode => Ok(encode( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), - ScalarFunction::Decode => Ok(decode( - parse_expr(&args[0], registry)?, - parse_expr(&args[1], registry)?, - )), ScalarFunction::NullIf => Ok(nullif( parse_expr(&args[0], registry)?, parse_expr(&args[1], registry)?, diff --git a/datafusion/proto/src/logical_plan/to_proto.rs b/datafusion/proto/src/logical_plan/to_proto.rs index a162b2389cd1..094a07f40ecb 100644 --- a/datafusion/proto/src/logical_plan/to_proto.rs +++ b/datafusion/proto/src/logical_plan/to_proto.rs @@ -1546,8 +1546,6 @@ impl TryFrom<&BuiltinScalarFunction> for protobuf::ScalarFunction { BuiltinScalarFunction::SHA384 => Self::Sha384, BuiltinScalarFunction::SHA512 => Self::Sha512, BuiltinScalarFunction::Digest => Self::Digest, - BuiltinScalarFunction::Decode => Self::Decode, - BuiltinScalarFunction::Encode => Self::Encode, BuiltinScalarFunction::ToTimestampMillis => Self::ToTimestampMillis, BuiltinScalarFunction::Log2 => Self::Log2, BuiltinScalarFunction::Signum => Self::Signum, diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index dea99f91e392..a913d2d19f64 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -34,7 +34,9 @@ use datafusion::execution::context::SessionState; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::parquet::file::properties::{WriterProperties, WriterVersion}; use datafusion::physical_plan::functions::make_scalar_function; -use datafusion::prelude::{create_udf, CsvReadOptions, SessionConfig, SessionContext}; +use datafusion::prelude::{ + create_udf, decode, encode, CsvReadOptions, SessionConfig, SessionContext, +}; use datafusion::test_util::{TestTableFactory, TestTableProvider}; use datafusion_common::file_options::csv_writer::CsvWriterOptions; use datafusion_common::file_options::parquet_writer::ParquetWriterOptions; @@ -52,8 +54,9 @@ use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore}; use datafusion_expr::{ col, create_udaf, lit, Accumulator, AggregateFunction, BuiltinScalarFunction::{Sqrt, Substr}, - Expr, LogicalPlan, Operator, PartitionEvaluator, Signature, TryCast, Volatility, - WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, + Expr, ExprSchemable, LogicalPlan, Operator, PartitionEvaluator, Signature, TryCast, + Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, WindowUDF, }; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, @@ -524,6 +527,30 @@ async fn roundtrip_logical_plan_with_extension() -> Result<()> { Ok(()) } +#[tokio::test] +async fn roundtrip_expr_api() -> Result<()> { + let ctx = SessionContext::new(); + ctx.register_csv("t1", "tests/testdata/test.csv", CsvReadOptions::default()) + .await?; + let table = ctx.table("t1").await?; + let schema = table.schema().clone(); + + // ensure expressions created with the expr api can be round tripped + let plan = table + .select(vec![ + encode(vec![ + col("a").cast_to(&DataType::Utf8, &schema)?, + lit("hex"), + ]), + decode(vec![lit("1234"), lit("hex")]), + ])? + .into_optimized_plan()?; + let bytes = logical_plan_to_bytes(&plan)?; + let logical_round_trip = logical_plan_from_bytes(&bytes, &ctx)?; + assert_eq!(format!("{plan:?}"), format!("{logical_round_trip:?}")); + Ok(()) +} + #[tokio::test] async fn roundtrip_logical_plan_with_view_scan() -> Result<()> { let ctx = SessionContext::new(); From 1a1e0f4c828d55bab19766f84e0504eaa363e78a Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 2 Jan 2024 16:24:02 -0500 Subject: [PATCH 02/12] better docs --- datafusion/execution/src/registry.rs | 2 +- datafusion/functions/src/encoding/mod.rs | 5 +++- datafusion/functions/src/lib.rs | 33 +++++++++++++++--------- datafusion/functions/src/macros.rs | 7 +++-- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/datafusion/execution/src/registry.rs b/datafusion/execution/src/registry.rs index 8b538beed18c..4d5b80f054df 100644 --- a/datafusion/execution/src/registry.rs +++ b/datafusion/execution/src/registry.rs @@ -66,7 +66,7 @@ pub trait SerializerRegistry: Send + Sync { ) -> Result>; } -/// A [`FunctionRegistry`] that uses in memory HashMaps +/// A [`FunctionRegistry`] that uses in memory [`HashMap`]s #[derive(Default, Debug)] pub struct MemoryFunctionRegistry { /// Scalar Functions diff --git a/datafusion/functions/src/encoding/mod.rs b/datafusion/functions/src/encoding/mod.rs index 7b0c6d0ff32f..8ca252b219b5 100644 --- a/datafusion/functions/src/encoding/mod.rs +++ b/datafusion/functions/src/encoding/mod.rs @@ -22,5 +22,8 @@ make_udf_function!(inner::EncodeFunc, ENCODE, encode); make_udf_function!(inner::DecodeFunc, DECODE, decode); // Export the functions out of this package, both as expr_fn as well as a list of functions -export_functions!(encode, decode); +export_functions!( + (encode, "encode the `input`, using the `encoding`. encoding can be base64 or hex"), + (decode, "decode the`input`, using the `encoding`. encoding can be base64 or hex") +); diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index cac0ac9d092d..874ea713b000 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -21,13 +21,13 @@ //! module, which can be activated by a feature flag. //! //! -//! # Available packages -//! * [`encoding`]: feature `encoding_expressions`: Hex and binary `encode` and `decode` functions +//! # Available Packages +//! See the list of modules in this crate for available packages. //! -//! # Using a package +//! # Using Package //! You can register all functions in all packages using the [`register_all`] function. //! -//! To register a certain package of functions: +//! To register only the functions in a certain package, you can do: //! ``` //! # fn main() -> datafusion_common::Result<()> { //! # let mut registry = datafusion_execution::registry::MemoryFunctionRegistry::new(); @@ -41,7 +41,9 @@ //! # } //! ``` //! -//! You can also use the "expr_fn" module to invoke functions in a fluent style: +//! You can also use the "expr_fn" module to create [`Expr`]s that invoke +//! functions in a fluent style: +//! //! ``` //! // create an Expr that will invoke the encode function //! use datafusion_expr::{col, lit}; @@ -50,17 +52,20 @@ //! let expr = expr_fn::encode(vec![col("my_data"), lit("hex")]); //! ``` //! -//! # Implementing a new package +//![`Expr`]: datafusion_expr::Expr +//! +//! # Implementing A New Package //! -//! To add a new package: +//! To add a new package to this crate:: //! -//! 1. Create a new module with the appropriate `ScalarUDF`s +//! 1. Create a new module with the appropriate `ScalarUDF` implementations. //! -//! 2. Add the new feature flag to `Cargo.toml`, with any optional dependencies +//! 2. Use the `make_udf_function!` and `export_functions!` macros to create +//! standard entry points //! -//! 3. export any `expr_fn`s in `expr_fn.rs` +//! 3. Add a new feature flag to `Cargo.toml`, with any optional dependencies //! -//! 2. Use the `make_package` macro to export the module if the specified feature is enabled +//! 4. Use the `make_package!` macro to export the module if the specified feature is enabled use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use log::debug; @@ -68,7 +73,11 @@ use log::debug; #[macro_use] mod macros; -make_package!(encoding, "encoding_expressions"); +make_package!( + encoding, + "encoding_expressions", + "Hex and binary `encode` and `decode` functions" +); /// Fluent-style API for creating `Expr`s to invoke functions pub mod expr_fn { diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 7ee6282fb2cb..73e25b3765c2 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -40,6 +40,7 @@ macro_rules! export_functions { ($($name:ident),*) => { pub mod expr_fn { $( + #[doc = concat!("Call the `", $name, "`function")] /// Return $name(arg) pub fn $name(args: Vec) -> datafusion_expr::Expr { super::$name().call(args) @@ -97,12 +98,14 @@ macro_rules! make_udf_function { /// instead of getting a cryptic "no function found" message at runtime. macro_rules! make_package { - ($name:ident, $feature:literal) => { + ($name:ident, $feature:literal, $DOC:expr) => { #[cfg(feature = $feature)] + #[doc = $DOC ] + #[doc = concat!("Enabled via feature flag `", $feature, "`")] pub mod $name; #[cfg(not(feature = $feature))] - /// Stub module when feature is not enabled + #[doc = concat!("Disabled. Enable via feature flag `", $feature, "`")] pub mod $name { use datafusion_expr::ScalarUDF; use log::debug; From 259f12234bae9a2f2a350f8a8ae07c0964a78dea Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 09:04:20 -0500 Subject: [PATCH 03/12] Improve docs + macros --- .../tests/dataframe/dataframe_functions.rs | 4 +- datafusion/expr/src/expr_fn.rs | 2 +- datafusion/functions/src/encoding/mod.rs | 7 ++-- datafusion/functions/src/lib.rs | 41 +++++++++++-------- datafusion/functions/src/macros.rs | 10 ++--- .../tests/cases/roundtrip_logical_plan.rs | 13 +++--- 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/datafusion/core/tests/dataframe/dataframe_functions.rs b/datafusion/core/tests/dataframe/dataframe_functions.rs index 864ce5f8b3ac..0d9027abc8b4 100644 --- a/datafusion/core/tests/dataframe/dataframe_functions.rs +++ b/datafusion/core/tests/dataframe/dataframe_functions.rs @@ -759,7 +759,7 @@ async fn test_fn_upper() -> Result<()> { #[tokio::test] async fn test_fn_encode() -> Result<()> { - let expr = encode(vec![col("a"), lit("hex")]); + let expr = encode(col("a"), lit("hex")); let expected = [ "+----------------------------+", @@ -782,7 +782,7 @@ async fn test_fn_decode() -> Result<()> { // binary is "hexadecimal" and therefore the output looks like decode did // nothing. So compare to a constant. let df_schema = DFSchema::try_from(test_schema().as_ref().clone())?; - let expr = decode(vec![encode(vec![col("a"), lit("hex")]), lit("hex")]) + let expr = decode(encode(col("a"), lit("hex")), lit("hex")) // need to cast to utf8 otherwise the default display of binary array is hex // so it looks like nothing is done .cast_to(&DataType::Utf8, &df_schema)?; diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs index a331f1c5896c..4bf2546b9f3f 100644 --- a/datafusion/expr/src/expr_fn.rs +++ b/datafusion/expr/src/expr_fn.rs @@ -496,7 +496,7 @@ pub fn is_not_unknown(expr: Expr) -> Expr { macro_rules! scalar_expr { ($ENUM:ident, $FUNC:ident, $($arg:ident)*, $DOC:expr) => { - #[doc = $DOC ] + #[doc = $DOC] pub fn $FUNC($($arg: Expr),*) -> Expr { Expr::ScalarFunction(ScalarFunction::new( built_in_function::BuiltinScalarFunction::$ENUM, diff --git a/datafusion/functions/src/encoding/mod.rs b/datafusion/functions/src/encoding/mod.rs index 8ca252b219b5..7bb1a0ea3aa3 100644 --- a/datafusion/functions/src/encoding/mod.rs +++ b/datafusion/functions/src/encoding/mod.rs @@ -17,13 +17,14 @@ mod inner; -// create `encode` and `decode` functions + +// create `encode` and `decode` UDFs make_udf_function!(inner::EncodeFunc, ENCODE, encode); make_udf_function!(inner::DecodeFunc, DECODE, decode); // Export the functions out of this package, both as expr_fn as well as a list of functions export_functions!( - (encode, "encode the `input`, using the `encoding`. encoding can be base64 or hex"), - (decode, "decode the`input`, using the `encoding`. encoding can be base64 or hex") + (encode, input encoding, "encode the `input`, using the `encoding`. encoding can be base64 or hex"), + (decode, input encoding, "decode the `input`, using the `encoding`. encoding can be base64 or hex") ); diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index 874ea713b000..406a86dda9e8 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -15,19 +15,25 @@ // specific language governing permissions and limitations // under the License. -//! Function packages for DataFusion +//! Function packages for DataFusion. //! -//! Each package is a implemented as a separate -//! module, which can be activated by a feature flag. +//! This crate contains a collection of various function packages for DataFusion, +//! implemented using the extension API. Users may wish to control which functions +//! are available to control the binary size of their application as well as +//! use dialect specific implementations of functions (e.g. Spark vs Postgres) //! +//! Each package is a implemented as a separate +//! module, activated by a feature flag. //! //! # Available Packages -//! See the list of modules in this crate for available packages. +//! See the list of [modules](#modules) in this crate for available packages. //! -//! # Using Package +//! # Using A Package //! You can register all functions in all packages using the [`register_all`] function. //! -//! To register only the functions in a certain package, you can do: +//! To access and use only the functions in a certain package, use the +//! `functions()` method in each module. +//! //! ``` //! # fn main() -> datafusion_common::Result<()> { //! # let mut registry = datafusion_execution::registry::MemoryFunctionRegistry::new(); @@ -41,14 +47,14 @@ //! # } //! ``` //! -//! You can also use the "expr_fn" module to create [`Expr`]s that invoke -//! functions in a fluent style: +//! Each package also exports an `expr_fn` submodule to help create [`Expr`]s that invoke +//! functions using a fluent style. For example: //! //! ``` //! // create an Expr that will invoke the encode function //! use datafusion_expr::{col, lit}; //! use datafusion_functions::expr_fn; -//! // encode(my_data, 'hex') +//! // Equivalent to "encode(my_data, 'hex')" in SQL: //! let expr = expr_fn::encode(vec![col("my_data"), lit("hex")]); //! ``` //! @@ -56,22 +62,25 @@ //! //! # Implementing A New Package //! -//! To add a new package to this crate:: +//! To add a new package to this crate, you should follow the model of existing +//! packages. The high level steps are: +//! +//! 1. Create a new module with the appropriate [`ScalarUDF`] implementations. //! -//! 1. Create a new module with the appropriate `ScalarUDF` implementations. +//! 2. Use the macros in [`macros`] to create standard entry points. //! -//! 2. Use the `make_udf_function!` and `export_functions!` macros to create -//! standard entry points +//! 3. Add a new feature to `Cargo.toml`, with any optional dependencies //! -//! 3. Add a new feature flag to `Cargo.toml`, with any optional dependencies +//! 4. Use the `make_package!` macro to expose the module when the +//! feature is enabled. //! -//! 4. Use the `make_package!` macro to export the module if the specified feature is enabled +//! [`ScalarUDF`]: datafusion_expr::ScalarUDF use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use log::debug; #[macro_use] -mod macros; +pub mod macros; make_package!( encoding, diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 73e25b3765c2..25f23d4fdab9 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -37,13 +37,13 @@ /// } /// ``` macro_rules! export_functions { - ($($name:ident),*) => { + ($(($FUNC:ident, $($arg:ident)*, $DOC:expr)),*) => { pub mod expr_fn { $( - #[doc = concat!("Call the `", $name, "`function")] + #[doc = $DOC] /// Return $name(arg) - pub fn $name(args: Vec) -> datafusion_expr::Expr { - super::$name().call(args) + pub fn $FUNC($($arg: datafusion_expr::Expr),*) -> datafusion_expr::Expr { + super::$FUNC().call(vec![$($arg),*],) } )* } @@ -52,7 +52,7 @@ macro_rules! export_functions { pub fn functions() -> Vec> { vec![ $( - $name(), + $FUNC(), )* ] } diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs index daf32a207a5c..828a96cefbc4 100644 --- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs @@ -55,9 +55,9 @@ use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore}; use datafusion_expr::{ col, create_udaf, lit, Accumulator, AggregateFunction, BuiltinScalarFunction::{Sqrt, Substr}, - Expr, LogicalPlan, Operator, PartitionEvaluator, Signature, TryCast, Volatility, - WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF, - WindowUDFImpl, + Expr, ExprSchemable, LogicalPlan, Operator, PartitionEvaluator, Signature, TryCast, + Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits, + WindowFunctionDefinition, WindowUDF, WindowUDFImpl, }; use datafusion_proto::bytes::{ logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec, @@ -539,11 +539,8 @@ async fn roundtrip_expr_api() -> Result<()> { // ensure expressions created with the expr api can be round tripped let plan = table .select(vec![ - encode(vec![ - col("a").cast_to(&DataType::Utf8, &schema)?, - lit("hex"), - ]), - decode(vec![lit("1234"), lit("hex")]), + encode(col("a").cast_to(&DataType::Utf8, &schema)?, lit("hex")), + decode(lit("1234"), lit("hex")), ])? .into_optimized_plan()?; let bytes = logical_plan_to_bytes(&plan)?; From 68b92899f54a561c51b0a7e7fddf6ed5ba242e9f Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 09:08:14 -0500 Subject: [PATCH 04/12] tweaks --- datafusion/functions/src/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index 406a86dda9e8..5b5141ed2df9 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Function packages for DataFusion. +//! Function packages for [DataFusion]. //! //! This crate contains a collection of various function packages for DataFusion, //! implemented using the extension API. Users may wish to control which functions @@ -25,6 +25,8 @@ //! Each package is a implemented as a separate //! module, activated by a feature flag. //! +//! [DataFusion]: https://crates.io/crates/datafusion +//! //! # Available Packages //! See the list of [modules](#modules) in this crate for available packages. //! @@ -85,10 +87,10 @@ pub mod macros; make_package!( encoding, "encoding_expressions", - "Hex and binary `encode` and `decode` functions" + "Hex and binary `encode` and `decode` functions." ); -/// Fluent-style API for creating `Expr`s to invoke functions +/// Fluent-style API for creating `Expr`s pub mod expr_fn { #[cfg(feature = "encoding_expressions")] pub use super::encoding::expr_fn::*; From 5deddb01a4e3b8bea892cf8af6f33a370fa9d761 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 09:09:16 -0500 Subject: [PATCH 05/12] updates --- datafusion/functions/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/functions/README.md b/datafusion/functions/README.md index 214cc8ff0af5..a610d135c0f6 100644 --- a/datafusion/functions/README.md +++ b/datafusion/functions/README.md @@ -21,6 +21,7 @@ [DataFusion][df] is an extensible query execution framework, written in Rust, that uses Apache Arrow as its in-memory format. -This crate contains several packages of function that can be used with DataFusion. +This crate contains packages of function that can be used to customize the +functionality of DataFusion. [df]: https://crates.io/crates/datafusion From f75472cc07d3c6922f65a2492bd675c3d4602cfd Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 10:05:19 -0500 Subject: [PATCH 06/12] fix doc --- datafusion/functions/src/macros.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/functions/src/macros.rs b/datafusion/functions/src/macros.rs index 25f23d4fdab9..1931ee279421 100644 --- a/datafusion/functions/src/macros.rs +++ b/datafusion/functions/src/macros.rs @@ -70,6 +70,8 @@ macro_rules! make_udf_function { std::sync::OnceLock::new(); /// Return a [`ScalarUDF`] for [`$UDF`] + /// + /// [`ScalarUDF`]: datafusion_expr::ScalarUDF fn $NAME() -> std::sync::Arc { $GNAME .get_or_init(|| { From 3d3d03a8909b1d65d1049add7780c1964a96062d Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 11:30:19 -0500 Subject: [PATCH 07/12] tomlfmt --- datafusion/core/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 7d39f8314aee..2fed737c1b60 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -65,7 +65,7 @@ dashmap = { workspace = true } datafusion-common = { path = "../common", version = "34.0.0", features = ["object_store"], default-features = false } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } -datafusion-functions = { path = "../functions", version = "34.0.0"} +datafusion-functions = { path = "../functions", version = "34.0.0" } datafusion-optimizer = { path = "../optimizer", version = "34.0.0", default-features = false } datafusion-physical-expr = { path = "../physical-expr", version = "34.0.0", default-features = false } datafusion-physical-plan = { workspace = true } From 6f961be1eabad5b3fe27acbba699e3eb749d8193 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 15 Jan 2024 11:32:12 -0500 Subject: [PATCH 08/12] fix doc --- datafusion/functions/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index 5b5141ed2df9..6d4cbd7b7298 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -57,7 +57,7 @@ //! use datafusion_expr::{col, lit}; //! use datafusion_functions::expr_fn; //! // Equivalent to "encode(my_data, 'hex')" in SQL: -//! let expr = expr_fn::encode(vec![col("my_data"), lit("hex")]); +//! let expr = expr_fn::encode(col("my_data"), lit("hex")); //! ``` //! //![`Expr`]: datafusion_expr::Expr From 2d21c357ee87e7133158ca0729c124cfc702945c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 23 Jan 2024 07:29:41 -0500 Subject: [PATCH 09/12] update datafusion-cli Cargo.locl --- datafusion-cli/Cargo.lock | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index c8850323949f..41c8dd162801 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -867,15 +867,15 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.31" +version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" +checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" dependencies = [ "android-tzdata", "iana-time-zone", "num-traits", "serde", - "windows-targets 0.48.5", + "windows-targets 0.52.0", ] [[package]] @@ -1227,7 +1227,7 @@ dependencies = [ [[package]] name = "datafusion-functions" -version = "34.0.0" +version = "35.0.0" dependencies = [ "arrow", "base64", @@ -2593,9 +2593,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.76" +version = "1.0.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95fc56cda0b5c3325f5fbbd7ff9fda9e02bb00bb3dac51252d2f1bfa1cb8cc8c" +checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" dependencies = [ "unicode-ident", ] @@ -2687,9 +2687,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" dependencies = [ "aho-corasick", "memchr", @@ -2699,9 +2699,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "3b7fa1134405e2ec9353fd416b17f8dacd46c473d7d3fd1cf202706a14eb792a" dependencies = [ "aho-corasick", "memchr", From cc6268e2a55bd05147e4e190f624c76a0ab8d226 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 27 Jan 2024 05:02:44 -0500 Subject: [PATCH 10/12] update datafusion-cli cargo.lock --- datafusion-cli/Cargo.lock | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 41c8dd162801..a5aa01358345 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -867,9 +867,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.32" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41daef31d7a747c5c847246f36de49ced6f7403b4cdabc807a97b5cc184cda7a" +checksum = "9f13690e35a5e4ace198e7beea2895d29f3a9cc55015fcebe6336bd2010af9eb" dependencies = [ "android-tzdata", "iana-time-zone", @@ -2489,18 +2489,18 @@ dependencies = [ [[package]] name = "pin-project" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fda4ed1c6c173e3fc7a83629421152e01d7b1f9b7f65fb301e490e8cfc656422" +checksum = "0302c4a0442c456bd56f841aee5c3bfd17967563f6fadc9ceb9f9c23cf3807e0" dependencies = [ "pin-project-internal", ] [[package]] name = "pin-project-internal" -version = "1.1.3" +version = "1.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405" +checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", @@ -2699,9 +2699,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b7fa1134405e2ec9353fd416b17f8dacd46c473d7d3fd1cf202706a14eb792a" +checksum = "5bb987efffd3c6d0d8f5f89510bb458559eab11e4f869acb20bf845e016259cd" dependencies = [ "aho-corasick", "memchr", @@ -3028,18 +3028,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63261df402c67811e9ac6def069e4786148c4563f4b50fd4bf30aa370d626b02" +checksum = "870026e60fa08c69f064aa766c10f10b1d62db9ccd4d0abb206472bee0ce3b32" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.195" +version = "1.0.196" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46fe8f8603d81ba86327b23a2e9cdf49e1255fb94a4c5f297f6ee0547178ea2c" +checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", @@ -3048,9 +3048,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.111" +version = "1.0.112" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "176e46fa42316f18edd598015a5166857fc835ec732f5215eac6b7bdbf0a84f4" +checksum = "4d1bd37ce2324cf3bf85e5a25f96eb4baf0d5aa6eba43e7ae8958870c4ec48ed" dependencies = [ "itoa", "ryu", @@ -3162,9 +3162,9 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" [[package]] name = "sqlparser" -version = "0.41.0" +version = "0.43.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cc2c25a6c66789625ef164b4c7d2e548d627902280c13710d33da8222169964" +checksum = "f95c4bae5aba7cd30bd506f7140026ade63cff5afd778af8854026f9606bf5d4" dependencies = [ "log", "sqlparser_derive", From f3c311d986da2436e19ffcb675f044794ed64e98 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Jan 2024 17:31:34 -0500 Subject: [PATCH 11/12] Remove outdated comment, make non pub --- datafusion/functions/src/encoding/inner.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index 5ca15a7e6b01..886a031a5269 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -139,8 +139,7 @@ impl ScalarUDFImpl for DecodeFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - // Put a feature flag here to make sure this is only compiled when the feature is activated - super::inner::decode(args) + decode(args) } } @@ -404,7 +403,7 @@ impl FromStr for Encoding { /// Encodes the given data, accepts Binary, LargeBinary, Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. /// Second argument is the encoding to use. /// Standard encodings are base64 and hex. -pub fn encode(args: &[ColumnarValue]) -> Result { +fn encode(args: &[ColumnarValue]) -> Result { if args.len() != 2 { return internal_err!( "{:?} args were supplied but encode takes exactly two arguments", @@ -430,7 +429,7 @@ pub fn encode(args: &[ColumnarValue]) -> Result { /// Decodes the given data, accepts Binary, LargeBinary, Utf8 or LargeUtf8 and returns a [`ColumnarValue`]. /// Second argument is the encoding to use. /// Standard encodings are base64 and hex. -pub fn decode(args: &[ColumnarValue]) -> Result { +fn decode(args: &[ColumnarValue]) -> Result { if args.len() != 2 { return internal_err!( "{:?} args were supplied but decode takes exactly two arguments", From 159cfbd32f9ac0fabaf4aa16b84b6ca82f574ce1 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 29 Jan 2024 17:33:27 -0500 Subject: [PATCH 12/12] Apply suggestions from code review Co-authored-by: Liang-Chi Hsieh --- datafusion/functions/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/functions/src/lib.rs b/datafusion/functions/src/lib.rs index 6d4cbd7b7298..91a5c510f0f9 100644 --- a/datafusion/functions/src/lib.rs +++ b/datafusion/functions/src/lib.rs @@ -22,7 +22,7 @@ //! are available to control the binary size of their application as well as //! use dialect specific implementations of functions (e.g. Spark vs Postgres) //! -//! Each package is a implemented as a separate +//! Each package is implemented as a separate //! module, activated by a feature flag. //! //! [DataFusion]: https://crates.io/crates/datafusion