From cd0af0f3c880a63ed3aa067d69fe66136df7c69c Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Mon, 4 Nov 2024 10:46:40 +0800 Subject: [PATCH 1/5] tmp --- .../src/catalog_common/information_schema.rs | 218 +++++++++++++++++- datafusion/core/src/execution/context/mod.rs | 14 +- .../core/src/execution/session_state.rs | 27 +-- datafusion/execution/src/task.rs | 12 + datafusion/expr-common/src/signature.rs | 56 +++++ datafusion/expr/src/registry.rs | 23 ++ datafusion/expr/src/udf.rs | 13 ++ .../test_files/information_schema.slt | 3 + 8 files changed, 348 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index 180994b1cbe89..bb780b6e6b9f5 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -28,7 +28,10 @@ use async_trait::async_trait; use datafusion_common::DataFusionError; use std::fmt::Debug; use std::{any::Any, sync::Arc}; - +use std::collections::HashMap; +use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use datafusion_common::error::Result; +use datafusion_expr::registry::FunctionRegistry; use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; use crate::execution::context::TaskContext; @@ -46,10 +49,11 @@ pub(crate) const VIEWS: &str = "views"; pub(crate) const COLUMNS: &str = "columns"; pub(crate) const DF_SETTINGS: &str = "df_settings"; pub(crate) const SCHEMATA: &str = "schemata"; +pub(crate) const ROUTINES: &str = "routines"; /// All information schema tables pub const INFORMATION_SCHEMA_TABLES: &[&str] = - &[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA]; + &[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA, ROUTINES]; /// Implements the `information_schema` virtual schema and tables /// @@ -208,6 +212,99 @@ impl InformationSchemaConfig { builder.add_setting(entry); } } + + fn make_routines( + &self, + udfs: &HashMap>, + udafs: &HashMap>, + udwfs: &HashMap>, + config_options: &ConfigOptions, + builder: &mut InformationSchemaRoutinesBuilder, + ) -> Result<()> + { + let catalog_name = &config_options.catalog.default_catalog; + let schema_name = &config_options.catalog.default_schema; + + for (name, udf) in udfs { + let combinations = get_udf_args_and_return_types(udf)?; + for (_, return_type) in combinations { + builder.add_routine( + &catalog_name, + &schema_name, + name, + "FUNCTION", + return_type, + "SCALAR", + udf.documentation().map(|d| d.description.to_string())) + } + } + + for (name, udaf) in udafs { + let combinations = get_udaf_args_and_return_types(udaf)?; + for (_, return_type) in combinations { + builder.add_routine( + &catalog_name, + &schema_name, + name, + "FUNCTION", + return_type, + "AGGREGATE", + udaf.documentation().map(|d| d.description.to_string())) + } + } + + for (name, udwf) in udwfs { + let combinations = get_udwf_args_and_return_types(udwf)?; + for (_, return_type) in combinations { + builder.add_routine( + &catalog_name, + &schema_name, + name, + "FUNCTION", + return_type, + "WINDOW", + udwf.documentation().map(|d| d.description.to_string())) + } + } + Ok(()) + } +} + +/// get the arguments and return types of a UDF +/// returns a tuple of (arg_types, return_type) +fn get_udf_args_and_return_types( + udf: &Arc, +) -> Result, Option<&str>)>> { + let signature = udf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + arg_types.into_iter().map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string()); + (arg_types, return_type) + }).collect() +} + +fn get_udaf_args_and_return_types( + udaf: &Arc, +) -> Result, Option<&str>)>> { + let signature = udaf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + arg_types.into_iter().map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = udaf.return_type(&arg_types).ok().map(|t| t.to_string()); + (arg_types, return_type) + }).collect() +} + +fn get_udwf_args_and_return_types( + udwf: &Arc, +) -> Result, Option<&str>)>> { + let signature = udwf.signature(); + let arg_types = signature.type_signature.get_possible_types(); + arg_types.into_iter().map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + (arg_types, None) + }).collect() } #[async_trait] @@ -234,6 +331,7 @@ impl SchemaProvider for InformationSchemaProvider { VIEWS => Arc::new(InformationSchemaViews::new(config)), DF_SETTINGS => Arc::new(InformationSchemaDfSettings::new(config)), SCHEMATA => Arc::new(InformationSchemata::new(config)), + ROUTINES => Arc::new(InformationSchemaRoutines::new(config)), _ => return Ok(None), }; @@ -819,3 +917,119 @@ impl InformationSchemaDfSettingsBuilder { .unwrap() } } + +#[derive(Debug)] +struct InformationSchemaRoutines { + schema: SchemaRef, + config: InformationSchemaConfig, +} + +impl InformationSchemaRoutines { + fn new(config: InformationSchemaConfig) -> Self { + let schema = Arc::new(Schema::new(vec![ + Field::new("specific_catalog", DataType::Utf8, false), + Field::new("specific_schema", DataType::Utf8, false), + Field::new("specific_name", DataType::Utf8, false), + Field::new("routine_catalog", DataType::Utf8, false), + Field::new("routine_schema", DataType::Utf8, false), + Field::new("routine_name", DataType::Utf8, false), + Field::new("routine_type", DataType::Utf8, false), + Field::new("data_type", DataType::Utf8, false), + Field::new("function_type", DataType::Utf8, true), + Field::new("help_text", DataType::Utf8, true), + ])); + + Self { schema, config } + } + + fn builder(&self) -> InformationSchemaRoutinesBuilder { + InformationSchemaRoutinesBuilder { + schema: self.schema.clone(), + specific_catalog: StringBuilder::new(), + specific_schema: StringBuilder::new(), + routine_catalog: StringBuilder::new(), + routine_schema: StringBuilder::new(), + routine_name: StringBuilder::new(), + routine_type: StringBuilder::new(), + data_type: StringBuilder::new(), + function_type: StringBuilder::new(), + description: StringBuilder::new(), + } + } +} + +struct InformationSchemaRoutinesBuilder { + schema: SchemaRef, + specific_catalog: StringBuilder, + specific_schema: StringBuilder, + routine_catalog: StringBuilder, + routine_schema: StringBuilder, + routine_name: StringBuilder, + routine_type: StringBuilder, + data_type: StringBuilder, + function_type: StringBuilder, + description: StringBuilder, +} + +impl InformationSchemaRoutinesBuilder { + fn add_routine( + &mut self, + catalog_name: impl AsRef, + schema_name: impl AsRef, + routine_name: impl AsRef, + routine_type: impl AsRef, + data_type: Option>, + function_type: impl AsRef, + description: Option>, + ) { + self.specific_catalog.append_value(catalog_name.as_ref()); + self.specific_schema.append_value(schema_name.as_ref()); + self.routine_catalog.append_value(catalog_name.as_ref()); + self.routine_schema.append_value(schema_name.as_ref()); + self.routine_name.append_value(routine_name.as_ref()); + self.routine_type.append_value(routine_type.as_ref()); + self.data_type.append_option(data_type.as_ref()); + self.function_type.append_value(function_type.as_ref()); + self.description.append_option(description); + } + + fn finish(&mut self) -> RecordBatch { + RecordBatch::try_new( + self.schema.clone(), + vec![ + Arc::new(self.specific_catalog.finish()), + Arc::new(self.specific_schema.finish()), + Arc::new(self.routine_catalog.finish()), + Arc::new(self.routine_schema.finish()), + Arc::new(self.routine_name.finish()), + Arc::new(self.routine_type.finish()), + Arc::new(self.data_type.finish()), + Arc::new(self.function_type.finish()), + Arc::new(self.description.finish()), + ], + ) + .unwrap() + } +} + +impl PartitionStream for InformationSchemaRoutines { + fn schema(&self) -> &SchemaRef { + &self.schema + } + + fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { + let config = self.config.clone(); + let mut builder = self.builder(); + Box::pin(RecordBatchStreamAdapter::new( + self.schema.clone(), + futures::stream::once(async move { + let udfs = ctx.scalar_functions(); + let udafs = ctx.aggregate_functions(); + let udwfs = ctx.window_functions(); + let config_options = ctx.session_config().options(); + config.make_routines(&udfs, &udafs, &udwfs, config_options, &mut builder)?; + Ok(builder.finish()) + }), + )) + } +} diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 606759aae5ee0..b46aa414c0c74 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -17,7 +17,7 @@ //! [`SessionContext`] API for registering data sources and executing queries -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::sync::{Arc, Weak}; @@ -1496,6 +1496,18 @@ impl FunctionRegistry for SessionContext { self.state.read().udfs() } + fn scalar_functions(&self) -> &HashMap> { + FunctionRegistry::scalar_functions(&self.state.read()) + } + + fn aggregate_functions(&self) -> &HashMap> { + FunctionRegistry::aggregate_functions(&self.state.read()) + } + + fn window_functions(&self) -> &HashMap> { + FunctionRegistry::window_functions(&self.state.read()) + } + fn udf(&self, name: &str) -> Result> { self.state.read().udf(name) } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 4953eecd66e39..858fcb3fea7c7 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -860,21 +860,6 @@ impl SessionState { self.catalog_list = catalog_list; } - /// Return reference to scalar_functions - pub fn scalar_functions(&self) -> &HashMap> { - &self.scalar_functions - } - - /// Return reference to aggregate_functions - pub fn aggregate_functions(&self) -> &HashMap> { - &self.aggregate_functions - } - - /// Return reference to window functions - pub fn window_functions(&self) -> &HashMap> { - &self.window_functions - } - /// Return reference to table_functions pub fn table_functions(&self) -> &HashMap> { &self.table_functions @@ -1680,6 +1665,18 @@ impl FunctionRegistry for SessionState { self.scalar_functions.keys().cloned().collect() } + fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + fn udf(&self, name: &str) -> datafusion_common::Result> { let result = self.scalar_functions.get(name); diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 57fcac0ee5ab6..1c2f672a08f8d 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -143,6 +143,18 @@ impl FunctionRegistry for TaskContext { self.scalar_functions.keys().cloned().collect() } + fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + fn udf(&self, name: &str) -> Result> { let result = self.scalar_functions.get(name); diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 24cb54f634b14..3c5a52f8736c4 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -243,6 +243,26 @@ impl TypeSignature { _ => false, } } + + /// get all possible types for the given `TypeSignature` + pub fn get_possible_types(&self) -> Vec> { + match self { + TypeSignature::Exact(types) => vec![types.clone()], + TypeSignature::OneOf(types) => types + .iter() + .flat_map(|type_sig| type_sig.get_possible_types()) + .collect(), + TypeSignature::Uniform(_, _) + | TypeSignature::Coercible(_) + | TypeSignature::Any(_) + | TypeSignature::Variadic(_) + | TypeSignature::VariadicAny + | TypeSignature::UserDefined + | TypeSignature::ArraySignature(_) + | TypeSignature::Numeric(_) + | TypeSignature::String(_) => vec![], + } + } } /// Defines the supported argument types ([`TypeSignature`]) and [`Volatility`] for a function. @@ -454,4 +474,40 @@ mod tests { < TypeSignature::Exact(vec![DataType::Null]) ); } + + #[test] + fn test_get_possible_types() { + let type_signature = TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]); + let possible_types = type_signature.get_possible_types(); + assert_eq!(possible_types, vec![vec![DataType::Int32, DataType::Int64]]); + + let type_signature = TypeSignature::OneOf(vec![ + TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float32, DataType::Float64]), + ]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Int32, DataType::Int64], + vec![DataType::Float32, DataType::Float64] + ] + ); + + let type_signature = TypeSignature::OneOf(vec![ + TypeSignature::Exact(vec![DataType::Int32, DataType::Int64]), + TypeSignature::Exact(vec![DataType::Float32, DataType::Float64]), + TypeSignature::Exact(vec![DataType::Utf8]), + ]); + let possible_types = type_signature.get_possible_types(); + assert_eq!( + possible_types, + vec![ + vec![DataType::Int32, DataType::Int64], + vec![DataType::Float32, DataType::Float64], + vec![DataType::Utf8] + ] + ); + } + } diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index 6d3457f70d4c7..ce79855dd622a 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -27,9 +27,19 @@ use std::sync::Arc; /// A registry knows how to build logical expressions out of user-defined function' names pub trait FunctionRegistry { + #[deprecated(since = "42.2.0", note = "Use `scalar_functions` instead")] /// Set of all available udfs. fn udfs(&self) -> HashSet; + /// Reference Map of all available scalar functions. + fn scalar_functions(&self) -> &HashMap>; + + /// Reference Map of all available aggregate functions. + fn aggregate_functions(&self) -> &HashMap>; + + /// Reference Map of all available window functions. + fn window_functions(&self) -> &HashMap>; + /// Returns a reference to the user defined scalar function (udf) named /// `name`. fn udf(&self, name: &str) -> Result>; @@ -163,6 +173,19 @@ impl FunctionRegistry for MemoryFunctionRegistry { self.udfs.keys().cloned().collect() } + fn scalar_functions(&self) -> &HashMap> { + &self.udfs + } + + fn aggregate_functions(&self) -> &HashMap> { + &self.udafs + } + + fn window_functions(&self) -> &HashMap> { + &self.udwfs + } + + fn udf(&self, name: &str) -> Result> { self.udfs .get(name) diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs index 3759fb18f56df..f66e4482e8681 100644 --- a/datafusion/expr/src/udf.rs +++ b/datafusion/expr/src/udf.rs @@ -164,6 +164,19 @@ impl ScalarUDF { self.inner.signature() } + /// The datatype this function returns given the input argument types. + /// This function is used when the input arguments are [`DataType`]s. + /// + /// # Notes + /// + /// If a function implement [`ScalarUDFImpl::return_type_from_exprs`], + /// its [`ScalarUDFImpl::return_type`] should raise an error. + /// + /// See [`ScalarUDFImpl::return_type`] for more details. + pub fn return_type(&self, arg_types: &[DataType]) -> Result { + self.inner.return_type(arg_types) + } + /// The datatype this function returns given the input argument input types. /// This function is used when the input arguments are [`Expr`]s. /// diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 57bf029a63c1b..c5f00d0f4a835 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -599,3 +599,6 @@ query TTTT SHOW CREATE TABLE abc; ---- datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv + +statement ok +SELECT * FROM information_schema.routines; From f358431f8c743a80c48ed94f5ab4dafb82b63d63 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 5 Nov 2024 10:26:39 +0800 Subject: [PATCH 2/5] introduce routines table --- .../src/catalog_common/information_schema.rs | 142 ++++++++++++------ datafusion/core/src/execution/context/mod.rs | 14 +- .../core/src/execution/session_state.rs | 18 +-- datafusion/execution/src/task.rs | 31 ++-- datafusion/expr-common/src/signature.rs | 2 +- datafusion/expr/src/registry.rs | 23 --- .../test_files/information_schema.slt | 29 +++- .../information_schema_multiple_catalogs.slt | 4 + .../information_schema_table_types.slt | 1 + 9 files changed, 145 insertions(+), 119 deletions(-) diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index bb780b6e6b9f5..56437414c794a 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -19,19 +19,6 @@ //! //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema -use arrow::{ - array::{StringBuilder, UInt64Builder}, - datatypes::{DataType, Field, Schema, SchemaRef}, - record_batch::RecordBatch, -}; -use async_trait::async_trait; -use datafusion_common::DataFusionError; -use std::fmt::Debug; -use std::{any::Any, sync::Arc}; -use std::collections::HashMap; -use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; -use datafusion_common::error::Result; -use datafusion_expr::registry::FunctionRegistry; use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; use crate::execution::context::TaskContext; @@ -42,6 +29,18 @@ use crate::{ config::{ConfigEntry, ConfigOptions}, physical_plan::streaming::PartitionStream, }; +use arrow::{ + array::{StringBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema, SchemaRef}, + record_batch::RecordBatch, +}; +use async_trait::async_trait; +use datafusion_common::error::Result; +use datafusion_common::DataFusionError; +use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; +use std::{any::Any, sync::Arc}; pub(crate) const INFORMATION_SCHEMA: &str = "information_schema"; pub(crate) const TABLES: &str = "tables"; @@ -220,14 +219,15 @@ impl InformationSchemaConfig { udwfs: &HashMap>, config_options: &ConfigOptions, builder: &mut InformationSchemaRoutinesBuilder, - ) -> Result<()> - { + ) -> Result<()> { let catalog_name = &config_options.catalog.default_catalog; let schema_name = &config_options.catalog.default_schema; for (name, udf) in udfs { - let combinations = get_udf_args_and_return_types(udf)?; - for (_, return_type) in combinations { + let return_types = get_udf_args_and_return_types(udf)? + .into_iter().map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { builder.add_routine( &catalog_name, &schema_name, @@ -235,13 +235,16 @@ impl InformationSchemaConfig { "FUNCTION", return_type, "SCALAR", - udf.documentation().map(|d| d.description.to_string())) + udf.documentation().map(|d| d.description.to_string()), + ) } } for (name, udaf) in udafs { - let combinations = get_udaf_args_and_return_types(udaf)?; - for (_, return_type) in combinations { + let return_types = get_udaf_args_and_return_types(udaf)? + .into_iter().map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { builder.add_routine( &catalog_name, &schema_name, @@ -249,13 +252,16 @@ impl InformationSchemaConfig { "FUNCTION", return_type, "AGGREGATE", - udaf.documentation().map(|d| d.description.to_string())) + udaf.documentation().map(|d| d.description.to_string()), + ) } } for (name, udwf) in udwfs { - let combinations = get_udwf_args_and_return_types(udwf)?; - for (_, return_type) in combinations { + let return_types = get_udwf_args_and_return_types(udwf)? + .into_iter().map(|(_, return_type)| return_type) + .collect::>(); + for return_type in return_types { builder.add_routine( &catalog_name, &schema_name, @@ -263,7 +269,8 @@ impl InformationSchemaConfig { "FUNCTION", return_type, "WINDOW", - udwf.documentation().map(|d| d.description.to_string())) + udwf.documentation().map(|d| d.description.to_string()), + ) } } Ok(()) @@ -274,37 +281,70 @@ impl InformationSchemaConfig { /// returns a tuple of (arg_types, return_type) fn get_udf_args_and_return_types( udf: &Arc, -) -> Result, Option<&str>)>> { +) -> Result, Option)>> { let signature = udf.signature(); let arg_types = signature.type_signature.get_possible_types(); - arg_types.into_iter().map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method - let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string()); - (arg_types, return_type) - }).collect() + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = udf.return_type(&arg_types).ok().map(|t| t.to_string()); + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, return_type) + }) + .collect::>()) + } } fn get_udaf_args_and_return_types( udaf: &Arc, -) -> Result, Option<&str>)>> { +) -> Result, Option)>> { let signature = udaf.signature(); let arg_types = signature.type_signature.get_possible_types(); - arg_types.into_iter().map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method - let return_type = udaf.return_type(&arg_types).ok().map(|t| t.to_string()); - (arg_types, return_type) - }).collect() + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let return_type = udaf.return_type(&arg_types).ok().map(|t| t.to_string()); + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, return_type) + }) + .collect::>()) + } } fn get_udwf_args_and_return_types( udwf: &Arc, -) -> Result, Option<&str>)>> { +) -> Result, Option)>> { let signature = udwf.signature(); let arg_types = signature.type_signature.get_possible_types(); - arg_types.into_iter().map(|arg_types| { - // only handle the function which implemented [`ScalarUDFImpl::return_type`] method - (arg_types, None) - }).collect() + if arg_types.is_empty() { + Ok(vec![(vec![], None)]) + } else { + Ok(arg_types + .into_iter() + .map(|arg_types| { + // only handle the function which implemented [`ScalarUDFImpl::return_type`] method + let arg_types = arg_types + .into_iter() + .map(|t| t.to_string()) + .collect::>(); + (arg_types, None) + }) + .collect::>()) + } } #[async_trait] @@ -934,9 +974,9 @@ impl InformationSchemaRoutines { Field::new("routine_schema", DataType::Utf8, false), Field::new("routine_name", DataType::Utf8, false), Field::new("routine_type", DataType::Utf8, false), - Field::new("data_type", DataType::Utf8, false), + Field::new("data_type", DataType::Utf8, true), Field::new("function_type", DataType::Utf8, true), - Field::new("help_text", DataType::Utf8, true), + Field::new("description", DataType::Utf8, true), ])); Self { schema, config } @@ -947,6 +987,7 @@ impl InformationSchemaRoutines { schema: self.schema.clone(), specific_catalog: StringBuilder::new(), specific_schema: StringBuilder::new(), + specific_name: StringBuilder::new(), routine_catalog: StringBuilder::new(), routine_schema: StringBuilder::new(), routine_name: StringBuilder::new(), @@ -962,6 +1003,7 @@ struct InformationSchemaRoutinesBuilder { schema: SchemaRef, specific_catalog: StringBuilder, specific_schema: StringBuilder, + specific_name: StringBuilder, routine_catalog: StringBuilder, routine_schema: StringBuilder, routine_name: StringBuilder, @@ -984,6 +1026,7 @@ impl InformationSchemaRoutinesBuilder { ) { self.specific_catalog.append_value(catalog_name.as_ref()); self.specific_schema.append_value(schema_name.as_ref()); + self.specific_name.append_value(routine_name.as_ref()); self.routine_catalog.append_value(catalog_name.as_ref()); self.routine_schema.append_value(schema_name.as_ref()); self.routine_name.append_value(routine_name.as_ref()); @@ -999,6 +1042,7 @@ impl InformationSchemaRoutinesBuilder { vec![ Arc::new(self.specific_catalog.finish()), Arc::new(self.specific_schema.finish()), + Arc::new(self.specific_name.finish()), Arc::new(self.routine_catalog.finish()), Arc::new(self.routine_schema.finish()), Arc::new(self.routine_name.finish()), @@ -1023,11 +1067,13 @@ impl PartitionStream for InformationSchemaRoutines { Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), futures::stream::once(async move { - let udfs = ctx.scalar_functions(); - let udafs = ctx.aggregate_functions(); - let udwfs = ctx.window_functions(); - let config_options = ctx.session_config().options(); - config.make_routines(&udfs, &udafs, &udwfs, config_options, &mut builder)?; + config.make_routines( + ctx.scalar_functions(), + ctx.aggregate_functions(), + ctx.window_functions(), + ctx.session_config().options(), + &mut builder, + )?; Ok(builder.finish()) }), )) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index b46aa414c0c74..606759aae5ee0 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -17,7 +17,7 @@ //! [`SessionContext`] API for registering data sources and executing queries -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::fmt::Debug; use std::sync::{Arc, Weak}; @@ -1496,18 +1496,6 @@ impl FunctionRegistry for SessionContext { self.state.read().udfs() } - fn scalar_functions(&self) -> &HashMap> { - FunctionRegistry::scalar_functions(&self.state.read()) - } - - fn aggregate_functions(&self) -> &HashMap> { - FunctionRegistry::aggregate_functions(&self.state.read()) - } - - fn window_functions(&self) -> &HashMap> { - FunctionRegistry::window_functions(&self.state.read()) - } - fn udf(&self, name: &str) -> Result> { self.state.read().udf(name) } diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 858fcb3fea7c7..f3628f1e2adad 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -227,15 +227,15 @@ impl Session for SessionState { } fn scalar_functions(&self) -> &HashMap> { - self.scalar_functions() + &self.scalar_functions } fn aggregate_functions(&self) -> &HashMap> { - self.aggregate_functions() + &self.aggregate_functions } fn window_functions(&self) -> &HashMap> { - self.window_functions() + &self.window_functions } fn runtime_env(&self) -> &Arc { @@ -1665,18 +1665,6 @@ impl FunctionRegistry for SessionState { self.scalar_functions.keys().cloned().collect() } - fn scalar_functions(&self) -> &HashMap> { - &self.scalar_functions - } - - fn aggregate_functions(&self) -> &HashMap> { - &self.aggregate_functions - } - - fn window_functions(&self) -> &HashMap> { - &self.window_functions - } - fn udf(&self, name: &str) -> datafusion_common::Result> { let result = self.scalar_functions.get(name); diff --git a/datafusion/execution/src/task.rs b/datafusion/execution/src/task.rs index 1c2f672a08f8d..35494443b4760 100644 --- a/datafusion/execution/src/task.rs +++ b/datafusion/execution/src/task.rs @@ -15,11 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; - use crate::{ config::SessionConfig, memory_pool::MemoryPool, @@ -29,6 +24,8 @@ use crate::{ use datafusion_common::{plan_datafusion_err, DataFusionError, Result}; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use std::collections::HashSet; +use std::{collections::HashMap, sync::Arc}; /// Task Execution Context /// @@ -125,6 +122,18 @@ impl TaskContext { Arc::clone(&self.runtime) } + pub fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + pub fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + pub fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + /// Update the [`SessionConfig`] pub fn with_session_config(mut self, session_config: SessionConfig) -> Self { self.session_config = session_config; @@ -143,18 +152,6 @@ impl FunctionRegistry for TaskContext { self.scalar_functions.keys().cloned().collect() } - fn scalar_functions(&self) -> &HashMap> { - &self.scalar_functions - } - - fn aggregate_functions(&self) -> &HashMap> { - &self.aggregate_functions - } - - fn window_functions(&self) -> &HashMap> { - &self.window_functions - } - fn udf(&self, name: &str) -> Result> { let result = self.scalar_functions.get(name); diff --git a/datafusion/expr-common/src/signature.rs b/datafusion/expr-common/src/signature.rs index 3c5a52f8736c4..31d8b90d72780 100644 --- a/datafusion/expr-common/src/signature.rs +++ b/datafusion/expr-common/src/signature.rs @@ -252,6 +252,7 @@ impl TypeSignature { .iter() .flat_map(|type_sig| type_sig.get_possible_types()) .collect(), + // TODO: Implement for other types TypeSignature::Uniform(_, _) | TypeSignature::Coercible(_) | TypeSignature::Any(_) @@ -509,5 +510,4 @@ mod tests { ] ); } - } diff --git a/datafusion/expr/src/registry.rs b/datafusion/expr/src/registry.rs index ce79855dd622a..6d3457f70d4c7 100644 --- a/datafusion/expr/src/registry.rs +++ b/datafusion/expr/src/registry.rs @@ -27,19 +27,9 @@ use std::sync::Arc; /// A registry knows how to build logical expressions out of user-defined function' names pub trait FunctionRegistry { - #[deprecated(since = "42.2.0", note = "Use `scalar_functions` instead")] /// Set of all available udfs. fn udfs(&self) -> HashSet; - /// Reference Map of all available scalar functions. - fn scalar_functions(&self) -> &HashMap>; - - /// Reference Map of all available aggregate functions. - fn aggregate_functions(&self) -> &HashMap>; - - /// Reference Map of all available window functions. - fn window_functions(&self) -> &HashMap>; - /// Returns a reference to the user defined scalar function (udf) named /// `name`. fn udf(&self, name: &str) -> Result>; @@ -173,19 +163,6 @@ impl FunctionRegistry for MemoryFunctionRegistry { self.udfs.keys().cloned().collect() } - fn scalar_functions(&self) -> &HashMap> { - &self.udfs - } - - fn aggregate_functions(&self) -> &HashMap> { - &self.udafs - } - - fn window_functions(&self) -> &HashMap> { - &self.udwfs - } - - fn udf(&self, name: &str) -> Result> { self.udfs .get(name) diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index c5f00d0f4a835..452f2a71135b8 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -39,6 +39,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -83,6 +84,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -97,6 +99,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -108,6 +111,7 @@ SELECT * from information_schema.tables WHERE tables.table_schema='information_s ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -117,6 +121,7 @@ SELECT * from information_schema.tables WHERE information_schema.tables.table_sc ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -126,6 +131,7 @@ SELECT * from information_schema.tables WHERE datafusion.information_schema.tabl ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -444,6 +450,7 @@ SHOW TABLES ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -600,5 +607,23 @@ SHOW CREATE TABLE abc; ---- datafusion public abc CREATE EXTERNAL TABLE abc STORED AS CSV LOCATION ../../testing/data/csv/aggregate_test_100.csv -statement ok -SELECT * FROM information_schema.routines; +# string_agg has different arg_types but same return type. Test avoiding duplicate entries for the same function. +query TTT +select routine_name, data_type, function_type from information_schema.routines where routine_name = 'string_agg'; +---- +string_agg LargeUtf8 AGGREGATE + +# test every function type are included in the result +query TTTTTTTTTT +select * from information_schema.routines where routine_name = 'date_trunc' OR routine_name = 'string_agg' OR routine_name = 'rank'; +---- +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public string_agg datafusion public string_agg FUNCTION LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. +datafusion public rank datafusion public rank FUNCTION NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. diff --git a/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt b/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt index 99a3820c2c4c0..988a4275c6e31 100644 --- a/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt +++ b/datafusion/sqllogictest/test_files/information_schema_multiple_catalogs.slt @@ -35,6 +35,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW @@ -80,11 +81,13 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW my_catalog information_schema columns VIEW my_catalog information_schema df_settings VIEW +my_catalog information_schema routines VIEW my_catalog information_schema schemata VIEW my_catalog information_schema tables VIEW my_catalog information_schema views VIEW @@ -92,6 +95,7 @@ my_catalog my_schema t1 BASE TABLE my_catalog my_schema t2 BASE TABLE my_other_catalog information_schema columns VIEW my_other_catalog information_schema df_settings VIEW +my_other_catalog information_schema routines VIEW my_other_catalog information_schema schemata VIEW my_other_catalog information_schema tables VIEW my_other_catalog information_schema views VIEW diff --git a/datafusion/sqllogictest/test_files/information_schema_table_types.slt b/datafusion/sqllogictest/test_files/information_schema_table_types.slt index 3bcab07898905..8a1a94c6a026d 100644 --- a/datafusion/sqllogictest/test_files/information_schema_table_types.slt +++ b/datafusion/sqllogictest/test_files/information_schema_table_types.slt @@ -36,6 +36,7 @@ SELECT * from information_schema.tables; ---- datafusion information_schema columns VIEW datafusion information_schema df_settings VIEW +datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW From 0ef6eb101d6517967d7d52685d77619e2129841b Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 5 Nov 2024 10:56:14 +0800 Subject: [PATCH 3/5] add is_deterministic field --- .../src/catalog_common/information_schema.rs | 43 +++++++++++++------ .../test_files/information_schema.slt | 27 +++++++----- .../substrait/src/logical_plan/consumer.rs | 1 + 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/catalog_common/information_schema.rs b/datafusion/core/src/catalog_common/information_schema.rs index 56437414c794a..53c1a8b11e1cc 100644 --- a/datafusion/core/src/catalog_common/information_schema.rs +++ b/datafusion/core/src/catalog_common/information_schema.rs @@ -22,7 +22,7 @@ use crate::catalog::{CatalogProviderList, SchemaProvider, TableProvider}; use crate::datasource::streaming::StreamingTable; use crate::execution::context::TaskContext; -use crate::logical_expr::TableType; +use crate::logical_expr::{TableType, Volatility}; use crate::physical_plan::stream::RecordBatchStreamAdapter; use crate::physical_plan::SendableRecordBatchStream; use crate::{ @@ -34,10 +34,11 @@ use arrow::{ datatypes::{DataType, Field, Schema, SchemaRef}, record_batch::RecordBatch, }; +use arrow_array::builder::BooleanBuilder; use async_trait::async_trait; use datafusion_common::error::Result; use datafusion_common::DataFusionError; -use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF}; +use datafusion_expr::{AggregateUDF, ScalarUDF, Signature, WindowUDF}; use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::{any::Any, sync::Arc}; @@ -225,14 +226,16 @@ impl InformationSchemaConfig { for (name, udf) in udfs { let return_types = get_udf_args_and_return_types(udf)? - .into_iter().map(|(_, return_type)| return_type) + .into_iter() + .map(|(_, return_type)| return_type) .collect::>(); for return_type in return_types { builder.add_routine( - &catalog_name, - &schema_name, + catalog_name, + schema_name, name, "FUNCTION", + Self::is_deterministic(udf.signature()), return_type, "SCALAR", udf.documentation().map(|d| d.description.to_string()), @@ -242,14 +245,16 @@ impl InformationSchemaConfig { for (name, udaf) in udafs { let return_types = get_udaf_args_and_return_types(udaf)? - .into_iter().map(|(_, return_type)| return_type) + .into_iter() + .map(|(_, return_type)| return_type) .collect::>(); for return_type in return_types { builder.add_routine( - &catalog_name, - &schema_name, + catalog_name, + schema_name, name, "FUNCTION", + Self::is_deterministic(udaf.signature()), return_type, "AGGREGATE", udaf.documentation().map(|d| d.description.to_string()), @@ -259,14 +264,16 @@ impl InformationSchemaConfig { for (name, udwf) in udwfs { let return_types = get_udwf_args_and_return_types(udwf)? - .into_iter().map(|(_, return_type)| return_type) + .into_iter() + .map(|(_, return_type)| return_type) .collect::>(); for return_type in return_types { builder.add_routine( - &catalog_name, - &schema_name, + catalog_name, + schema_name, name, "FUNCTION", + Self::is_deterministic(udwf.signature()), return_type, "WINDOW", udwf.documentation().map(|d| d.description.to_string()), @@ -275,6 +282,10 @@ impl InformationSchemaConfig { } Ok(()) } + + fn is_deterministic(signature: &Signature) -> bool { + signature.volatility == Volatility::Immutable + } } /// get the arguments and return types of a UDF @@ -314,7 +325,8 @@ fn get_udaf_args_and_return_types( .into_iter() .map(|arg_types| { // only handle the function which implemented [`ScalarUDFImpl::return_type`] method - let return_type = udaf.return_type(&arg_types).ok().map(|t| t.to_string()); + let return_type = + udaf.return_type(&arg_types).ok().map(|t| t.to_string()); let arg_types = arg_types .into_iter() .map(|t| t.to_string()) @@ -974,6 +986,7 @@ impl InformationSchemaRoutines { Field::new("routine_schema", DataType::Utf8, false), Field::new("routine_name", DataType::Utf8, false), Field::new("routine_type", DataType::Utf8, false), + Field::new("is_deterministic", DataType::Boolean, true), Field::new("data_type", DataType::Utf8, true), Field::new("function_type", DataType::Utf8, true), Field::new("description", DataType::Utf8, true), @@ -992,6 +1005,7 @@ impl InformationSchemaRoutines { routine_schema: StringBuilder::new(), routine_name: StringBuilder::new(), routine_type: StringBuilder::new(), + is_deterministic: BooleanBuilder::new(), data_type: StringBuilder::new(), function_type: StringBuilder::new(), description: StringBuilder::new(), @@ -1008,18 +1022,21 @@ struct InformationSchemaRoutinesBuilder { routine_schema: StringBuilder, routine_name: StringBuilder, routine_type: StringBuilder, + is_deterministic: BooleanBuilder, data_type: StringBuilder, function_type: StringBuilder, description: StringBuilder, } impl InformationSchemaRoutinesBuilder { + #[allow(clippy::too_many_arguments)] fn add_routine( &mut self, catalog_name: impl AsRef, schema_name: impl AsRef, routine_name: impl AsRef, routine_type: impl AsRef, + is_deterministic: bool, data_type: Option>, function_type: impl AsRef, description: Option>, @@ -1031,6 +1048,7 @@ impl InformationSchemaRoutinesBuilder { self.routine_schema.append_value(schema_name.as_ref()); self.routine_name.append_value(routine_name.as_ref()); self.routine_type.append_value(routine_type.as_ref()); + self.is_deterministic.append_value(is_deterministic); self.data_type.append_option(data_type.as_ref()); self.function_type.append_value(function_type.as_ref()); self.description.append_option(description); @@ -1047,6 +1065,7 @@ impl InformationSchemaRoutinesBuilder { Arc::new(self.routine_schema.finish()), Arc::new(self.routine_name.finish()), Arc::new(self.routine_type.finish()), + Arc::new(self.is_deterministic.finish()), Arc::new(self.data_type.finish()), Arc::new(self.function_type.finish()), Arc::new(self.description.finish()), diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 452f2a71135b8..170fcc78fa85e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -614,16 +614,21 @@ select routine_name, data_type, function_type from information_schema.routines w string_agg LargeUtf8 AGGREGATE # test every function type are included in the result -query TTTTTTTTTT +query TTTTTTTBTTT rowsort select * from information_schema.routines where routine_name = 'date_trunc' OR routine_name = 'string_agg' OR routine_name = 'rank'; ---- -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. -datafusion public date_trunc datafusion public date_trunc FUNCTION Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. -datafusion public string_agg datafusion public string_agg FUNCTION LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. -datafusion public rank datafusion public rank FUNCTION NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Microsecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Millisecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Nanosecond, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, None) SCALAR Truncates a timestamp value to a specified precision. +datafusion public date_trunc datafusion public date_trunc FUNCTION true Timestamp(Second, Some("+TZ")) SCALAR Truncates a timestamp value to a specified precision. +datafusion public rank datafusion public rank FUNCTION true NULL WINDOW Returns the rank of the current row within its partition, allowing gaps between ranks. This function provides a ranking similar to `row_number`, but skips ranks for identical values. +datafusion public string_agg datafusion public string_agg FUNCTION true LargeUtf8 AGGREGATE Concatenates the values of string expressions and places separator values between them. + +query B +select is_deterministic from information_schema.routines where routine_name = 'now'; +---- +false diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 5f1824bc4b303..6025c02e946e3 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -97,6 +97,7 @@ use substrait::proto::{ AggregateFunction, Expression, NamedStruct, Plan, Rel, Type, }; use substrait::proto::{ExtendedExpression, FunctionArgument, SortField}; +use datafusion::catalog::Session; // Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which // is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone From 3f5412fa06c9121d0fb5668ffc3098651fa6df9f Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 5 Nov 2024 11:02:24 +0800 Subject: [PATCH 4/5] cargo fmt --- datafusion/substrait/src/logical_plan/consumer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 6025c02e946e3..fd265f97b9934 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -53,6 +53,7 @@ use crate::variation_const::{ TIMESTAMP_SECOND_TYPE_VARIATION_REF, }; use datafusion::arrow::array::{new_empty_array, AsArray}; +use datafusion::catalog::Session; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::dataframe::DataFrame; use datafusion::logical_expr::expr::InList; @@ -97,7 +98,6 @@ use substrait::proto::{ AggregateFunction, Expression, NamedStruct, Plan, Rel, Type, }; use substrait::proto::{ExtendedExpression, FunctionArgument, SortField}; -use datafusion::catalog::Session; // Substrait PrecisionTimestampTz indicates that the timestamp is relative to UTC, which // is the same as the expectation for any non-empty timezone in DF, so any non-empty timezone From 1a949dbb0fa1840830b0bf81cb48624e14851689 Mon Sep 17 00:00:00 2001 From: Jia-Xuan Liu Date: Tue, 5 Nov 2024 11:13:34 +0800 Subject: [PATCH 5/5] rollback the session_state changed --- datafusion/core/src/execution/session_state.rs | 15 +++++++++++++++ datafusion/substrait/src/logical_plan/consumer.rs | 1 - 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index ea7e8626bae21..e8ee983c00fde 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -860,6 +860,21 @@ impl SessionState { self.catalog_list = catalog_list; } + /// Return reference to scalar_functions + pub fn scalar_functions(&self) -> &HashMap> { + &self.scalar_functions + } + + /// Return reference to aggregate_functions + pub fn aggregate_functions(&self) -> &HashMap> { + &self.aggregate_functions + } + + /// Return reference to window functions + pub fn window_functions(&self) -> &HashMap> { + &self.window_functions + } + /// Return reference to table_functions pub fn table_functions(&self) -> &HashMap> { &self.table_functions diff --git a/datafusion/substrait/src/logical_plan/consumer.rs b/datafusion/substrait/src/logical_plan/consumer.rs index 92f4fc3d09a77..890da7361d7c8 100644 --- a/datafusion/substrait/src/logical_plan/consumer.rs +++ b/datafusion/substrait/src/logical_plan/consumer.rs @@ -55,7 +55,6 @@ use crate::variation_const::{ }; use datafusion::arrow::array::{new_empty_array, AsArray}; use datafusion::arrow::temporal_conversions::NANOSECONDS; -use datafusion::catalog::Session; use datafusion::common::scalar::ScalarStructBuilder; use datafusion::dataframe::DataFrame; use datafusion::logical_expr::builder::project;