From e995629c67ee9d18ba57c61fbc220786986c1e2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Tue, 9 Sep 2025 11:14:06 +0200 Subject: [PATCH 01/14] impl try_parse_url spark function --- datafusion/spark/src/function/url/mod.rs | 5 +- .../spark/src/function/url/parse_url.rs | 320 ++++++++++++++++-- .../spark/src/function/url/try_parse_url.rs | 84 +++++ .../test_files/spark/url/try_parse_url.slt | 72 ++++ 4 files changed, 454 insertions(+), 27 deletions(-) create mode 100644 datafusion/spark/src/function/url/try_parse_url.rs create mode 100644 datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt diff --git a/datafusion/spark/src/function/url/mod.rs b/datafusion/spark/src/function/url/mod.rs index 7c959572a8263..3aabf6024eb8b 100644 --- a/datafusion/spark/src/function/url/mod.rs +++ b/datafusion/spark/src/function/url/mod.rs @@ -20,15 +20,18 @@ use datafusion_functions::make_udf_function; use std::sync::Arc; pub mod parse_url; +pub mod try_parse_url; make_udf_function!(parse_url::ParseUrl, parse_url); +make_udf_function!(try_parse_url::TryParseUrl, try_parse_url); pub mod expr_fn { use datafusion_functions::export_functions; export_functions!((parse_url, "Extracts a part from a URL.", args)); + export_functions!((try_parse_url, "This is a special version of parse_url that performs the same operation, but returns a NULL value instead of raising an error if the parsing cannot be performed.", args)); } pub fn functions() -> Vec> { - vec![parse_url()] + vec![parse_url(), try_parse_url()] } diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 5c3b1d7d3edaf..07b99530c9575 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -27,8 +27,7 @@ use datafusion_common::cast::{ }; use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, - Volatility, + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::utils::make_scalar_function; use url::Url; @@ -47,23 +46,7 @@ impl Default for ParseUrl { impl ParseUrl { pub fn new() -> Self { Self { - signature: Signature::one_of( - vec![ - TypeSignature::Uniform( - 1, - vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], - ), - TypeSignature::Uniform( - 2, - vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], - ), - TypeSignature::Uniform( - 3, - vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], - ), - ], - Volatility::Immutable, - ), + signature: Signature::user_defined(Volatility::Immutable), } } /// Parses a URL and extracts the specified component. @@ -152,22 +135,39 @@ impl ScalarUDFImpl for ParseUrl { arg_types.len() ); } + // The return type should match the largest size datatype match arg_types.len() { - 2 | 3 => { + 2 | 3 if arg_types.iter().all(is_string_type) => { if arg_types .iter() .any(|arg| matches!(arg, DataType::LargeUtf8)) { Ok(DataType::LargeUtf8) - } else if arg_types - .iter() - .any(|arg| matches!(arg, DataType::Utf8View)) - { - Ok(DataType::Utf8View) } else { Ok(DataType::Utf8) } } + 2 | 3 => plan_err!( + "`{}` expects STRING arguments, got {:?}", + &self.name(), + arg_types + ), + _ => plan_err!( + "`{}` expects 2 or 3 arguments, got {}", + &self.name(), + arg_types.len() + ), + } + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + match arg_types.len() { + 2 | 3 if arg_types.iter().all(is_string_type) => Ok(arg_types.to_vec()), + 2 | 3 => plan_err!( + "`{}` expects STRING arguments, got {:?}", + &self.name(), + arg_types + ), _ => plan_err!( "`{}` expects 2 or 3 arguments, got {}", &self.name(), @@ -182,6 +182,13 @@ impl ScalarUDFImpl for ParseUrl { } } +fn is_string_type(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 + ) +} + /// Core implementation of URL parsing function. /// /// # Arguments @@ -199,6 +206,13 @@ impl ScalarUDFImpl for ParseUrl { /// - The output array type (StringArray or LargeStringArray) is determined by input types /// fn spark_parse_url(args: &[ArrayRef]) -> Result { + spark_handled_parse_url(args, |x| x) +} + +pub fn spark_handled_parse_url( + args: &[ArrayRef], + handler_err: impl Fn(Result>) -> Result>, +) -> Result { if args.len() < 2 || args.len() > 3 { return exec_err!( "{} expects 2 or 3 arguments, but got {}", @@ -211,14 +225,115 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { let part = &args[1]; let result = if args.len() == 3 { + // In this case, the 'key' argument is passed let key = &args[2]; + // Cover all 27 possible cases: 3 arguments, each of which can take 3 different data types. + // If any argument is of type LargeUtf8, the resulting data type will be LargeStringArray. + // Otherwise, the result will be a StringArray. match (url.data_type(), part.data_type(), key.data_type()) { (DataType::Utf8, DataType::Utf8, DataType::Utf8) => { process_parse_url::<_, _, _, StringArray>( as_string_array(url)?, as_string_array(part)?, as_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_string_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8View, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_view_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8View, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_view_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8View, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_string_view_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::LargeUtf8, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_large_string_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_large_string_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8, DataType::LargeUtf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_large_string_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_string_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8View, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_view_array(part)?, + as_string_array(key)?, + handler_err, ) } (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { @@ -226,6 +341,103 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { as_string_view_array(url)?, as_string_view_array(part)?, as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8View, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_string_view_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::LargeUtf8, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_large_string_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::LargeUtf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_large_string_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::Utf8View, DataType::LargeUtf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_large_string_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8View, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_view_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8View, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_view_array(part)?, + as_string_view_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8View, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_view_array(part)?, + as_large_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::LargeUtf8, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_large_string_array(part)?, + as_string_array(key)?, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::LargeUtf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_large_string_array(part)?, + as_string_view_array(key)?, + handler_err, ) } (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { @@ -233,6 +445,7 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { as_large_string_array(url)?, as_large_string_array(part)?, as_large_string_array(key)?, + handler_err, ) } _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), @@ -246,12 +459,40 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { } let key = builder.finish(); + // Handle 9 combinations - 2 arguments, each argument can have 3 different data types + // The result data type would be LargeStringArray if there is any argument with LargeUtf8 data type + // Else the StringArray would be returned match (url.data_type(), part.data_type()) { (DataType::Utf8, DataType::Utf8) => { process_parse_url::<_, _, _, StringArray>( as_string_array(url)?, as_string_array(part)?, &key, + handler_err, + ) + } + (DataType::Utf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, StringArray>( + as_string_array(url)?, + as_string_view_array(part)?, + &key, + handler_err, + ) + } + (DataType::Utf8, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_array(url)?, + as_large_string_array(part)?, + &key, + handler_err, + ) + } + (DataType::Utf8View, DataType::Utf8) => { + process_parse_url::<_, _, _, StringArray>( + as_string_view_array(url)?, + as_string_array(part)?, + &key, + handler_err, ) } (DataType::Utf8View, DataType::Utf8View) => { @@ -259,6 +500,31 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { as_string_view_array(url)?, as_string_view_array(part)?, &key, + handler_err, + ) + } + (DataType::Utf8View, DataType::LargeUtf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_string_view_array(url)?, + as_large_string_array(part)?, + &key, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_array(part)?, + &key, + handler_err, + ) + } + (DataType::LargeUtf8, DataType::Utf8View) => { + process_parse_url::<_, _, _, LargeStringArray>( + as_large_string_array(url)?, + as_string_view_array(part)?, + &key, + handler_err, ) } (DataType::LargeUtf8, DataType::LargeUtf8) => { @@ -266,6 +532,7 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result { as_large_string_array(url)?, as_large_string_array(part)?, &key, + handler_err, ) } _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), @@ -278,6 +545,7 @@ fn process_parse_url<'a, A, B, C, T>( url_array: &'a A, part_array: &'a B, key_array: &'a C, + handle: impl Fn(Result>) -> Result>, ) -> Result where &'a A: StringArrayType<'a>, @@ -291,7 +559,7 @@ where .zip(key_array.iter()) .map(|((url, part), key)| { if let (Some(url), Some(part), key) = (url, part, key) { - ParseUrl::parse(url, part, key) + handle(ParseUrl::parse(url, part, key)) } else { Ok(None) } diff --git a/datafusion/spark/src/function/url/try_parse_url.rs b/datafusion/spark/src/function/url/try_parse_url.rs new file mode 100644 index 0000000000000..6aaa05b784b42 --- /dev/null +++ b/datafusion/spark/src/function/url/try_parse_url.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use crate::function::url::parse_url::{spark_handled_parse_url, ParseUrl}; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType; +use datafusion_common::Result; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::utils::make_scalar_function; + +/// TRY_PARSE_URL function for tolerant URL component extraction (never errors; returns NULL on invalid or missing parts). +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct TryParseUrl { + signature: Signature, +} + +impl Default for TryParseUrl { + fn default() -> Self { + Self::new() + } +} + +impl TryParseUrl { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for TryParseUrl { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "try_parse_url" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let parse_url: ParseUrl = ParseUrl::new(); + parse_url.return_type(arg_types) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { args, .. } = args; + make_scalar_function(spark_try_parse_url, vec![])(&args) + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + let parse_url: ParseUrl = ParseUrl::new(); + parse_url.coerce_types(arg_types) + } +} + +fn spark_try_parse_url(args: &[ArrayRef]) -> Result { + spark_handled_parse_url(args, |x| match x { + Err(_) => Ok(None), + result => result, + }) +} diff --git a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt new file mode 100644 index 0000000000000..dcef21aa0e903 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file was originally created by a porting script from: +# https://github.com/lakehq/sail/blob/b6095cc7fccaf016b47f009ba93b2357dc781a7d/python/pysail/tests/spark/function/test_try_parse_url.txt +# This file is part of the implementation of the datafusion-spark function library. +# For more information, please see: +# https://github.com/apache/datafusion/issues/15914 + +query T +SELECT try_parse_url('https://example.com/a?x=1', 'QUERY', 'x'); +---- +1 + +query T +SELECT try_parse_url('www.example.com/path?x=1', 'HOST'); +---- +NULL + +query T +SELECT try_parse_url('https://example.com/?a=1', 'QUERY', 'b'); +---- +NULL + +query T +SELECT try_parse_url('https://example.com/path#frag', 'REF'); +---- +frag + +query T +SELECT try_parse_url('ftp://user:pwd@ftp.example.com:21/files', 'USERINFO'); +---- +user:pwd + +query T +SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'HOST'); +---- +[2001:db8::2] + +query T +SELECT try_parse_url('notaurl', 'HOST'); +---- +NULL + +query T +SELECT try_parse_url('https://example.com', 'PATH'); +---- +/ + +query T +SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'PROTOCOL'); +---- +https + +query T +SELECT try_parse_url('https://ex.com/?Tag=ok', 'QUERY', 'tag'); +---- +NULL From 54e0125b24cff4f78646e0e2a473a2be4ef1cac9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Thu, 11 Sep 2025 13:20:11 +0200 Subject: [PATCH 02/14] suggestions --- .../spark/src/function/url/parse_url.rs | 9 +--- .../test_files/spark/url/try_parse_url.slt | 41 +++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 07b99530c9575..39eb208e8f541 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -79,7 +79,7 @@ impl ParseUrl { fn parse(value: &str, part: &str, key: Option<&str>) -> Result> { Url::parse(value) .map_err(|e| exec_datafusion_err!("{e:?}")) - .map(|url| match part { + .map(|url| match part.to_uppercase().as_str() { "HOST" => url.host_str().map(String::from), "PATH" => Some(url.path().to_string()), "QUERY" => match key { @@ -128,13 +128,6 @@ impl ScalarUDFImpl for ParseUrl { } fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.len() < 2 || arg_types.len() > 3 { - return plan_err!( - "{} expects 2 or 3 arguments, but got {}", - self.name(), - arg_types.len() - ); - } // The return type should match the largest size datatype match arg_types.len() { 2 | 3 if arg_types.iter().all(is_string_type) => { diff --git a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt index dcef21aa0e903..6171fb9ffac63 100644 --- a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt @@ -26,47 +26,88 @@ SELECT try_parse_url('https://example.com/a?x=1', 'QUERY', 'x'); ---- 1 +SELECT try_parse_url('https://example.com/a?x=1', 'query', 'X'); +---- +1 + query T SELECT try_parse_url('www.example.com/path?x=1', 'HOST'); ---- NULL +SELECT try_parse_url('www.example.com/path?x=1', 'host'); +---- +NULL + query T SELECT try_parse_url('https://example.com/?a=1', 'QUERY', 'b'); ---- NULL +SELECT try_parse_url('https://example.com/?a=1', 'Query', 'b'); +---- +NULL + query T SELECT try_parse_url('https://example.com/path#frag', 'REF'); ---- frag +SELECT try_parse_url('https://example.com/path#frag', 'ref'); +---- +frag + query T SELECT try_parse_url('ftp://user:pwd@ftp.example.com:21/files', 'USERINFO'); ---- user:pwd +SELECT try_parse_url('ftp://user:pwd@ftp.example.com:21/files', 'userinfo'); +---- +user:pwd + query T SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'HOST'); ---- [2001:db8::2] +SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'Host'); +---- +[2001:db8::2] + query T SELECT try_parse_url('notaurl', 'HOST'); ---- NULL +SELECT try_parse_url('notaurl', 'host'); +---- +NULL + query T SELECT try_parse_url('https://example.com', 'PATH'); ---- / +SELECT try_parse_url('https://example.com', 'path'); +---- +/ + query T SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'PROTOCOL'); ---- https +SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'Protocol'); +---- +https + query T SELECT try_parse_url('https://ex.com/?Tag=ok', 'QUERY', 'tag'); ---- NULL + +SELECT try_parse_url('https://ex.com/?Tag=ok', 'query', 'tag'); +---- +NULL + From 944d2046bdee0d7b9cef258c815993136d4102b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Thu, 11 Sep 2025 14:16:20 +0200 Subject: [PATCH 03/14] fix parse_url --- .../spark/src/function/url/parse_url.rs | 190 +++++++++++++++++- 1 file changed, 187 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 39eb208e8f541..a35f4b1573f56 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -79,9 +79,13 @@ impl ParseUrl { fn parse(value: &str, part: &str, key: Option<&str>) -> Result> { Url::parse(value) .map_err(|e| exec_datafusion_err!("{e:?}")) - .map(|url| match part.to_uppercase().as_str() { + .map(|url| match part { "HOST" => url.host_str().map(String::from), - "PATH" => Some(url.path().to_string()), + "PATH" => { + let path: String = url.path().to_string(); + let path: String = if path == "/" { "".to_string() } else { path }; + Some(path) + } "QUERY" => match key { None => url.query().map(String::from), Some(key) => url @@ -98,7 +102,13 @@ impl ParseUrl { None => Some(path.to_string()), } } - "AUTHORITY" => Some(url.authority().to_string()), + "AUTHORITY" => { + let authority: String = url.authority().to_string(); + match (url.port(), url.port_or_known_default()) { + (None, Some(port)) => Some(format!("{authority}:{port}")), + _ => Some(authority), + } + }, "USERINFO" => { let username = url.username(); if username.is_empty() { @@ -560,3 +570,177 @@ where .collect::>() .map(|array| Arc::new(array) as ArrayRef) } + + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + ArrayRef, Int32Array, LargeStringArray, StringArray, + }; + use arrow::datatypes::DataType; + use datafusion_common::Result; + use std::sync::Arc; + + fn sa(vals: &[Option<&str>]) -> ArrayRef { + Arc::new(StringArray::from(vals.to_vec())) as ArrayRef + } + fn lsa(vals: &[Option<&str>]) -> ArrayRef { + Arc::new(LargeStringArray::from(vals.to_vec())) as ArrayRef + } + + #[test] + fn test_parse_host() -> Result<()> { + let got = ParseUrl::parse("https://example.com/a?x=1", "HOST", None)?; + assert_eq!(got, Some("example.com".to_string())); + Ok(()) + } + + #[test] + fn test_parse_query_no_key_vs_with_key() -> Result<()> { + let got_all = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", None)?; + assert_eq!(got_all, Some("a=1&b=2".to_string())); + + let got_a = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("a"))?; + assert_eq!(got_a, Some("1".to_string())); + + let got_c = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("c"))?; + assert_eq!(got_c, None); + Ok(()) + } + + #[test] + fn test_parse_ref_protocol_userinfo_file_authority() -> Result<()> { + let url = "ftp://user:pwd@ftp.example.com:21/files?x=1#frag"; + assert_eq!(ParseUrl::parse(url, "REF", None)?, Some("frag".to_string())); + assert_eq!(ParseUrl::parse(url, "PROTOCOL", None)?, Some("ftp".to_string())); + assert_eq!(ParseUrl::parse(url, "USERINFO", None)?, Some("user:pwd".to_string())); + assert_eq!(ParseUrl::parse(url, "FILE", None)?, Some("/files?x=1".to_string())); + assert_eq!(ParseUrl::parse(url, "AUTHORITY", None)?, Some("user:pwd@ftp.example.com:21".to_string())); + Ok(()) + } + + #[test] + fn test_parse_path_root_is_empty_string() -> Result<()> { + let got = ParseUrl::parse("https://example.com/", "PATH", None)?; + assert_eq!(got, Some("".to_string())); + Ok(()) + } + + #[test] + fn test_parse_malformed_url_returns_error() { + let err = ParseUrl::parse("notaurl", "HOST", None).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("DataFusion") || msg.contains("error"), "msg was: {msg}"); + } + + #[test] + fn test_spark_utf8_two_args() -> Result<()> { + let urls = sa(&[Some("https://example.com/a?x=1"), Some("https://ex.com/")]); + let parts = sa(&[Some("HOST"), Some("PATH")]); + + let out = spark_handled_parse_url(&[urls, parts], |x| x)?; + let out_sa = out.as_any().downcast_ref::().unwrap(); + + assert_eq!(out_sa.len(), 2); + assert_eq!(out_sa.value(0), "example.com"); + assert_eq!(out_sa.value(1), ""); + Ok(()) + } + + #[test] + fn test_spark_utf8_three_args_query_key() -> Result<()> { + let urls = sa(&[Some("https://example.com/a?x=1&y=2"), Some("https://ex.com/?a=1")]); + let parts = sa(&[Some("QUERY"), Some("QUERY")]); + let keys = sa(&[Some("y"), Some("b")]); + + let out = spark_handled_parse_url(&[urls, parts, keys], |x| x)?; + let out_sa = out.as_any().downcast_ref::().unwrap(); + + assert_eq!(out_sa.len(), 2); + assert_eq!(out_sa.value(0), "2"); + assert!(out_sa.is_null(1)); + Ok(()) + } + + #[test] + fn test_spark_largeutf8_propagation() -> Result<()> { + let urls = lsa(&[Some("http://[2001:db8::2]:8080/index.html?ok=1")]); + let parts = sa(&[Some("HOST")]); + let out = spark_handled_parse_url(&[urls, parts], |x| x)?; + assert!(out.as_any().downcast_ref::().is_some()); + + let lsa_out = out.as_any().downcast_ref::().unwrap(); + assert_eq!(lsa_out.value(0), "[2001:db8::2]"); + Ok(()) + } + + #[test] + fn test_spark_userinfo_and_nulls() -> Result<()> { + let urls = sa(&[ + Some("ftp://user:pwd@ftp.example.com:21/files"), + Some("https://example.com"), + None, + ]); + let parts = sa(&[Some("USERINFO"), Some("USERINFO"), Some("USERINFO")]); + + let out = spark_handled_parse_url(&[urls, parts], |x| x)?; + let out_sa = out.as_any().downcast_ref::().unwrap(); + + assert_eq!(out_sa.len(), 3); + assert_eq!(out_sa.value(0), "user:pwd"); + assert!(out_sa.is_null(1)); + assert!(out_sa.is_null(2)); + Ok(()) + } + + #[test] + fn test_invalid_arg_count() { + let urls = sa(&[Some("https://example.com")]); + let err = spark_handled_parse_url(&[urls.clone()], |x| x).unwrap_err(); + assert!(format!("{err}").contains("expects 2 or 3 arguments")); + + let parts = sa(&[Some("HOST")]); + let keys = sa(&[Some("x")]); + let err = spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x).unwrap_err(); + assert!(format!("{err}").contains("expects 2 or 3 arguments")); + } + + #[test] + fn test_non_string_types_error() { + let urls = sa(&[Some("https://example.com")]); + let bad_part = Arc::new(Int32Array::from(vec![1])) as ArrayRef; + + let err = spark_handled_parse_url(&[urls, bad_part], |x| x).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("expects STRING arguments")); + } + + #[test] + fn test_return_type_and_coercion() -> Result<()> { + let udf = ParseUrl::new(); + + let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8])?; + assert_eq!(rt, DataType::Utf8); + + let rt = udf.return_type(&[DataType::LargeUtf8, DataType::Utf8])?; + assert_eq!(rt, DataType::LargeUtf8); + + let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::LargeUtf8])?; + assert_eq!(rt, DataType::LargeUtf8); + + let err = udf.return_type(&[DataType::Int32, DataType::Utf8]).unwrap_err(); + assert!(format!("{err}").contains("expects STRING arguments")); + + let err = udf.return_type(&[DataType::Utf8]).unwrap_err(); + assert!(format!("{err}").contains("expects 2 or 3 arguments")); + + let ok = udf.coerce_types(&[DataType::Utf8, DataType::LargeUtf8])?; + assert_eq!(ok, vec![DataType::Utf8, DataType::LargeUtf8]); + + let err = udf.coerce_types(&[DataType::Utf8]).unwrap_err(); + assert!(format!("{err}").contains("expects 2 or 3 arguments")); + + Ok(()) + } +} From 87405e2f3b4f322a919464223f86a14a0e39e869 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Thu, 11 Sep 2025 15:48:29 +0200 Subject: [PATCH 04/14] fix parse_url --- .../spark/src/function/url/parse_url.rs | 52 +++++++++++++------ .../test_files/spark/url/try_parse_url.slt | 33 +++++++----- 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index a35f4b1573f56..db8b8de7d37ee 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -108,7 +108,7 @@ impl ParseUrl { (None, Some(port)) => Some(format!("{authority}:{port}")), _ => Some(authority), } - }, + } "USERINFO" => { let username = url.username(); if username.is_empty() { @@ -571,13 +571,10 @@ where .map(|array| Arc::new(array) as ArrayRef) } - #[cfg(test)] mod tests { use super::*; - use arrow::array::{ - ArrayRef, Int32Array, LargeStringArray, StringArray, - }; + use arrow::array::{ArrayRef, Int32Array, LargeStringArray, StringArray}; use arrow::datatypes::DataType; use datafusion_common::Result; use std::sync::Arc; @@ -613,10 +610,22 @@ mod tests { fn test_parse_ref_protocol_userinfo_file_authority() -> Result<()> { let url = "ftp://user:pwd@ftp.example.com:21/files?x=1#frag"; assert_eq!(ParseUrl::parse(url, "REF", None)?, Some("frag".to_string())); - assert_eq!(ParseUrl::parse(url, "PROTOCOL", None)?, Some("ftp".to_string())); - assert_eq!(ParseUrl::parse(url, "USERINFO", None)?, Some("user:pwd".to_string())); - assert_eq!(ParseUrl::parse(url, "FILE", None)?, Some("/files?x=1".to_string())); - assert_eq!(ParseUrl::parse(url, "AUTHORITY", None)?, Some("user:pwd@ftp.example.com:21".to_string())); + assert_eq!( + ParseUrl::parse(url, "PROTOCOL", None)?, + Some("ftp".to_string()) + ); + assert_eq!( + ParseUrl::parse(url, "USERINFO", None)?, + Some("user:pwd".to_string()) + ); + assert_eq!( + ParseUrl::parse(url, "FILE", None)?, + Some("/files?x=1".to_string()) + ); + assert_eq!( + ParseUrl::parse(url, "AUTHORITY", None)?, + Some("user:pwd@ftp.example.com:21".to_string()) + ); Ok(()) } @@ -631,7 +640,10 @@ mod tests { fn test_parse_malformed_url_returns_error() { let err = ParseUrl::parse("notaurl", "HOST", None).unwrap_err(); let msg = format!("{err}"); - assert!(msg.contains("DataFusion") || msg.contains("error"), "msg was: {msg}"); + assert!( + msg.contains("DataFusion") || msg.contains("error"), + "msg was: {msg}" + ); } #[test] @@ -650,9 +662,12 @@ mod tests { #[test] fn test_spark_utf8_three_args_query_key() -> Result<()> { - let urls = sa(&[Some("https://example.com/a?x=1&y=2"), Some("https://ex.com/?a=1")]); + let urls = sa(&[ + Some("https://example.com/a?x=1&y=2"), + Some("https://ex.com/?a=1"), + ]); let parts = sa(&[Some("QUERY"), Some("QUERY")]); - let keys = sa(&[Some("y"), Some("b")]); + let keys = sa(&[Some("y"), Some("b")]); let out = spark_handled_parse_url(&[urls, parts, keys], |x| x)?; let out_sa = out.as_any().downcast_ref::().unwrap(); @@ -701,8 +716,10 @@ mod tests { assert!(format!("{err}").contains("expects 2 or 3 arguments")); let parts = sa(&[Some("HOST")]); - let keys = sa(&[Some("x")]); - let err = spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x).unwrap_err(); + let keys = sa(&[Some("x")]); + let err = + spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x) + .unwrap_err(); assert!(format!("{err}").contains("expects 2 or 3 arguments")); } @@ -726,10 +743,13 @@ mod tests { let rt = udf.return_type(&[DataType::LargeUtf8, DataType::Utf8])?; assert_eq!(rt, DataType::LargeUtf8); - let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::LargeUtf8])?; + let rt = + udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::LargeUtf8])?; assert_eq!(rt, DataType::LargeUtf8); - let err = udf.return_type(&[DataType::Int32, DataType::Utf8]).unwrap_err(); + let err = udf + .return_type(&[DataType::Int32, DataType::Utf8]) + .unwrap_err(); assert!(format!("{err}").contains("expects STRING arguments")); let err = udf.return_type(&[DataType::Utf8]).unwrap_err(); diff --git a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt index 6171fb9ffac63..f541f8eedc0bf 100644 --- a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt @@ -26,15 +26,17 @@ SELECT try_parse_url('https://example.com/a?x=1', 'QUERY', 'x'); ---- 1 -SELECT try_parse_url('https://example.com/a?x=1', 'query', 'X'); +query T +SELECT try_parse_url('https://example.com/a?x=1', 'query', 'x'); ---- -1 +NULL query T SELECT try_parse_url('www.example.com/path?x=1', 'HOST'); ---- NULL +query T SELECT try_parse_url('www.example.com/path?x=1', 'host'); ---- NULL @@ -44,7 +46,8 @@ SELECT try_parse_url('https://example.com/?a=1', 'QUERY', 'b'); ---- NULL -SELECT try_parse_url('https://example.com/?a=1', 'Query', 'b'); +query T +SELECT try_parse_url('https://example.com/?a=1', 'query', 'b'); ---- NULL @@ -53,33 +56,37 @@ SELECT try_parse_url('https://example.com/path#frag', 'REF'); ---- frag +query T SELECT try_parse_url('https://example.com/path#frag', 'ref'); ---- -frag +NULL query T SELECT try_parse_url('ftp://user:pwd@ftp.example.com:21/files', 'USERINFO'); ---- user:pwd +query T SELECT try_parse_url('ftp://user:pwd@ftp.example.com:21/files', 'userinfo'); ---- -user:pwd +NULL query T SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'HOST'); ---- [2001:db8::2] -SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'Host'); +query T +SELECT try_parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'host'); ---- -[2001:db8::2] +NULL query T SELECT try_parse_url('notaurl', 'HOST'); ---- NULL +query T SELECT try_parse_url('notaurl', 'host'); ---- NULL @@ -87,27 +94,29 @@ NULL query T SELECT try_parse_url('https://example.com', 'PATH'); ---- -/ +(empty) +query T SELECT try_parse_url('https://example.com', 'path'); ---- -/ +NULL query T SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'PROTOCOL'); ---- https -SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'Protocol'); +query T +SELECT try_parse_url('https://example.com/a/b?x=1&y=2#frag', 'protocol'); ---- -https +NULL query T SELECT try_parse_url('https://ex.com/?Tag=ok', 'QUERY', 'tag'); ---- NULL +query T SELECT try_parse_url('https://ex.com/?Tag=ok', 'query', 'tag'); ---- NULL - From 532bd38e21a9602f2c60ebce6fed0d726eb2d26e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Thu, 11 Sep 2025 16:02:32 +0200 Subject: [PATCH 05/14] fix parse_url --- .../test_files/spark/url/parse_url.slt | 101 ++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt index cca07ceb6ff43..7d8a8b6b7f9a9 100644 --- a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt @@ -70,6 +70,107 @@ SELECT parse_url('http://userinfo@spark.apache.org/path?query=1#Ref'::string, 'U ---- userinfo +query T +SELECT parse_url('https://example.com/a?x=1', 'QUERY', 'x'); +---- +1 + +query T +SELECT parse_url('https://example.com/a?x=1', 'query', 'x'); +---- +NULL + +query T +SELECT parse_url('www.example.com/path?x=1', 'HOST'); +---- +NULL + +query T +SELECT parse_url('www.example.com/path?x=1', 'host'); +---- +NULL + +query T +SELECT parse_url('https://example.com/?a=1', 'QUERY', 'b'); +---- +NULL + +query T +SELECT parse_url('https://example.com/?a=1', 'query', 'b'); +---- +NULL + +query T +SELECT parse_url('https://example.com/path#frag', 'REF'); +---- +frag + +query T +SELECT parse_url('https://example.com/path#frag', 'ref'); +---- +NULL + +query T +SELECT parse_url('ftp://user:pwd@ftp.example.com:21/files', 'USERINFO'); +---- +user:pwd + +query T +SELECT parse_url('ftp://user:pwd@ftp.example.com:21/files', 'userinfo'); +---- +NULL + +query T +SELECT parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'HOST'); +---- +[2001:db8::2] + +query T +SELECT parse_url('http://[2001:db8::2]:8080/index.html?ok=1', 'host'); +---- +NULL + +query T +SELECT parse_url('notaurl', 'HOST'); +---- +NULL + +query T +SELECT parse_url('notaurl', 'host'); +---- +NULL + +query T +SELECT parse_url('https://example.com', 'PATH'); +---- +(empty) + +query T +SELECT parse_url('https://example.com', 'path'); +---- +NULL + +query T +SELECT parse_url('https://example.com/a/b?x=1&y=2#frag', 'PROTOCOL'); +---- +https + +query T +SELECT parse_url('https://example.com/a/b?x=1&y=2#frag', 'protocol'); +---- +NULL + +query T +SELECT parse_url('https://ex.com/?Tag=ok', 'QUERY', 'tag'); +---- +NULL + +query T +SELECT parse_url('https://ex.com/?Tag=ok', 'query', 'tag'); +---- +NULL + + statement error parse_url expects 2 or 3 arguments, but got 1 SELECT parse_url('http://userinfo@spark.apache.org/path?query=1#Ref'::string); From 37dc796231c6bbd9237e7f664925c06bd301a661 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Fri, 12 Sep 2025 18:28:01 +0200 Subject: [PATCH 06/14] suggestions --- datafusion/spark/src/function/url/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/url/mod.rs b/datafusion/spark/src/function/url/mod.rs index 3aabf6024eb8b..8d10edf6bbfc0 100644 --- a/datafusion/spark/src/function/url/mod.rs +++ b/datafusion/spark/src/function/url/mod.rs @@ -28,8 +28,8 @@ make_udf_function!(try_parse_url::TryParseUrl, try_parse_url); pub mod expr_fn { use datafusion_functions::export_functions; - export_functions!((parse_url, "Extracts a part from a URL.", args)); - export_functions!((try_parse_url, "This is a special version of parse_url that performs the same operation, but returns a NULL value instead of raising an error if the parsing cannot be performed.", args)); + export_functions!((parse_url, "Extracts a part from a URL, throwing an error if an invalid URL is provided.", args)); + export_functions!((try_parse_url, "Same as parse_url but returns NULL if an invalid URL is provided.", args)); } pub fn functions() -> Vec> { From 36ead8b5c2bc5702b8884e8b9e5fd737c33abe78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Fri, 12 Sep 2025 18:32:02 +0200 Subject: [PATCH 07/14] suggestions --- .../spark/src/function/url/parse_url.rs | 282 +----------------- 1 file changed, 12 insertions(+), 270 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 4579bf49df173..cbbbaff755b01 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -27,9 +27,7 @@ use datafusion_common::cast::{ as_large_string_array, as_string_array, as_string_view_array, }; use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; -use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, -}; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility}; use datafusion_functions::utils::make_scalar_function; use url::Url; @@ -47,7 +45,13 @@ impl Default for ParseUrl { impl ParseUrl { pub fn new() -> Self { Self { - signature: Signature::user_defined(Volatility::Immutable), + signature: Signature::one_of( + vec![ + TypeSignature::String(2), + TypeSignature::String(3), + ], + Volatility::Immutable, + ), } } /// Parses a URL and extracts the specified component. @@ -164,22 +168,6 @@ impl ScalarUDFImpl for ParseUrl { } } - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - match arg_types.len() { - 2 | 3 if arg_types.iter().all(is_string_type) => Ok(arg_types.to_vec()), - 2 | 3 => plan_err!( - "`{}` expects STRING arguments, got {:?}", - &self.name(), - arg_types - ), - _ => plan_err!( - "`{}` expects 2 or 3 arguments, got {}", - &self.name(), - arg_types.len() - ), - } - } - fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; make_scalar_function(spark_parse_url, vec![])(&args) @@ -232,9 +220,6 @@ pub fn spark_handled_parse_url( // In this case, the 'key' argument is passed let key = &args[2]; - // Cover all 27 possible cases: 3 arguments, each of which can take 3 different data types. - // If any argument is of type LargeUtf8, the resulting data type will be LargeStringArray. - // Otherwise, the result will be a StringArray. match (url.data_type(), part.data_type(), key.data_type()) { (DataType::Utf8, DataType::Utf8, DataType::Utf8) => { process_parse_url::<_, _, _, StringArray>( @@ -244,7 +229,7 @@ pub fn spark_handled_parse_url( handler_err, ) } - (DataType::Utf8, DataType::Utf8, DataType::Utf8View) => { + (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { process_parse_url::<_, _, _, StringArray>( as_string_array(url)?, as_string_array(part)?, @@ -252,202 +237,10 @@ pub fn spark_handled_parse_url( handler_err, ) } - (DataType::Utf8, DataType::Utf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_string_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::Utf8View, DataType::Utf8) => { - process_parse_url::<_, _, _, StringArray>( - as_string_array(url)?, - as_string_view_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringArray>( - as_string_array(url)?, - as_string_view_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::Utf8View, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_string_view_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::LargeUtf8, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_large_string_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::LargeUtf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_large_string_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::Utf8, DataType::LargeUtf8, DataType::LargeUtf8) => { + (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { process_parse_url::<_, _, _, LargeStringArray>( as_string_array(url)?, - as_large_string_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8, DataType::Utf8) => { - process_parse_url::<_, _, _, StringArray>( - as_string_view_array(url)?, - as_string_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringArray>( - as_string_view_array(url)?, - as_string_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_string_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8View, DataType::Utf8) => { - process_parse_url::<_, _, _, StringArray>( - as_string_view_array(url)?, - as_string_view_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringViewArray>( - as_string_view_array(url)?, - as_string_view_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8View, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_string_view_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::LargeUtf8, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_large_string_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::LargeUtf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_large_string_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::Utf8View, DataType::LargeUtf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_large_string_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, as_string_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8View, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_view_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_view_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8View, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_view_array(part)?, - as_large_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::LargeUtf8, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_large_string_array(part)?, - as_string_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::LargeUtf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_large_string_array(part)?, - as_string_view_array(key)?, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_large_string_array(part)?, as_large_string_array(key)?, handler_err, ) @@ -463,9 +256,6 @@ pub fn spark_handled_parse_url( } let key = builder.finish(); - // Handle 9 combinations - 2 arguments, each argument can have 3 different data types - // The result data type would be LargeStringArray if there is any argument with LargeUtf8 data type - // Else the StringArray would be returned match (url.data_type(), part.data_type()) { (DataType::Utf8, DataType::Utf8) => { process_parse_url::<_, _, _, StringArray>( @@ -475,7 +265,7 @@ pub fn spark_handled_parse_url( handler_err, ) } - (DataType::Utf8, DataType::Utf8View) => { + (DataType::Utf8View, DataType::Utf8View) => { process_parse_url::<_, _, _, StringArray>( as_string_array(url)?, as_string_view_array(part)?, @@ -483,57 +273,9 @@ pub fn spark_handled_parse_url( handler_err, ) } - (DataType::Utf8, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_large_string_array(part)?, - &key, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8) => { - process_parse_url::<_, _, _, StringArray>( - as_string_view_array(url)?, - as_string_array(part)?, - &key, - handler_err, - ) - } - (DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringViewArray>( - as_string_view_array(url)?, - as_string_view_array(part)?, - &key, - handler_err, - ) - } - (DataType::Utf8View, DataType::LargeUtf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, - as_large_string_array(part)?, - &key, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_array(part)?, - &key, - handler_err, - ) - } - (DataType::LargeUtf8, DataType::Utf8View) => { - process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, - as_string_view_array(part)?, - &key, - handler_err, - ) - } (DataType::LargeUtf8, DataType::LargeUtf8) => { process_parse_url::<_, _, _, LargeStringArray>( - as_large_string_array(url)?, + as_string_view_array(url)?, as_large_string_array(part)?, &key, handler_err, From a07f7eb9f760986e18a6195265ebf4c681dc37a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Fri, 12 Sep 2025 19:07:00 +0200 Subject: [PATCH 08/14] suggestions --- datafusion/spark/src/function/url/parse_url.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index cbbbaff755b01..5e017607fcd15 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -230,17 +230,17 @@ pub fn spark_handled_parse_url( ) } (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringArray>( - as_string_array(url)?, - as_string_array(part)?, + process_parse_url::<_, _, _, StringViewArray>( + as_string_view_array(url)?, + as_string_view_array(part)?, as_string_view_array(key)?, handler_err, ) } (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { process_parse_url::<_, _, _, LargeStringArray>( - as_string_array(url)?, - as_string_array(part)?, + as_large_string_array(url)?, + as_large_string_array(part)?, as_large_string_array(key)?, handler_err, ) @@ -266,8 +266,8 @@ pub fn spark_handled_parse_url( ) } (DataType::Utf8View, DataType::Utf8View) => { - process_parse_url::<_, _, _, StringArray>( - as_string_array(url)?, + process_parse_url::<_, _, _, StringViewArray>( + as_string_view_array(url)?, as_string_view_array(part)?, &key, handler_err, @@ -275,7 +275,7 @@ pub fn spark_handled_parse_url( } (DataType::LargeUtf8, DataType::LargeUtf8) => { process_parse_url::<_, _, _, LargeStringArray>( - as_string_view_array(url)?, + as_large_string_array(url)?, as_large_string_array(part)?, &key, handler_err, From 80e725908917e192fbbdf922a4fa65e0bede9a67 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Fri, 12 Sep 2025 19:09:38 +0200 Subject: [PATCH 09/14] suggestions --- .../spark/src/function/url/parse_url.rs | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 5e017607fcd15..78bf011ce6a2c 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -143,23 +143,29 @@ impl ScalarUDFImpl for ParseUrl { } fn return_type(&self, arg_types: &[DataType]) -> Result { - // The return type should match the largest size datatype + if arg_types.len() < 2 || arg_types.len() > 3 { + return plan_err!( + "{} expects 2 or 3 arguments, but got {}", + self.name(), + arg_types.len() + ); + } match arg_types.len() { - 2 | 3 if arg_types.iter().all(is_string_type) => { + 2 | 3 => { if arg_types .iter() .any(|arg| matches!(arg, DataType::LargeUtf8)) { Ok(DataType::LargeUtf8) + } else if arg_types + .iter() + .any(|arg| matches!(arg, DataType::Utf8View)) + { + Ok(DataType::Utf8View) } else { Ok(DataType::Utf8) } } - 2 | 3 => plan_err!( - "`{}` expects STRING arguments, got {:?}", - &self.name(), - arg_types - ), _ => plan_err!( "`{}` expects 2 or 3 arguments, got {}", &self.name(), @@ -174,13 +180,6 @@ impl ScalarUDFImpl for ParseUrl { } } -fn is_string_type(dt: &DataType) -> bool { - matches!( - dt, - DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 - ) -} - /// Core implementation of URL parsing function. /// /// # Arguments From b58b4c1339ae459fc5fa2c2f7b9f56ab0f2cbaab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Sat, 13 Sep 2025 17:13:57 +0200 Subject: [PATCH 10/14] edit --- datafusion/spark/src/function/url/mod.rs | 12 ++++++++++-- datafusion/spark/src/function/url/parse_url.rs | 10 +++++----- datafusion/spark/src/function/url/try_parse_url.rs | 13 ++++++------- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/datafusion/spark/src/function/url/mod.rs b/datafusion/spark/src/function/url/mod.rs index 8d10edf6bbfc0..82bf8a9e09616 100644 --- a/datafusion/spark/src/function/url/mod.rs +++ b/datafusion/spark/src/function/url/mod.rs @@ -28,8 +28,16 @@ make_udf_function!(try_parse_url::TryParseUrl, try_parse_url); pub mod expr_fn { use datafusion_functions::export_functions; - export_functions!((parse_url, "Extracts a part from a URL, throwing an error if an invalid URL is provided.", args)); - export_functions!((try_parse_url, "Same as parse_url but returns NULL if an invalid URL is provided.", args)); + export_functions!(( + parse_url, + "Extracts a part from a URL, throwing an error if an invalid URL is provided.", + args + )); + export_functions!(( + try_parse_url, + "Same as parse_url but returns NULL if an invalid URL is provided.", + args + )); } pub fn functions() -> Vec> { diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 78bf011ce6a2c..6d151fbef8942 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -27,7 +27,10 @@ use datafusion_common::cast::{ as_large_string_array, as_string_array, as_string_view_array, }; use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; -use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, +}; use datafusion_functions::utils::make_scalar_function; use url::Url; @@ -46,10 +49,7 @@ impl ParseUrl { pub fn new() -> Self { Self { signature: Signature::one_of( - vec![ - TypeSignature::String(2), - TypeSignature::String(3), - ], + vec![TypeSignature::String(2), TypeSignature::String(3)], Volatility::Immutable, ), } diff --git a/datafusion/spark/src/function/url/try_parse_url.rs b/datafusion/spark/src/function/url/try_parse_url.rs index 6aaa05b784b42..c04850f3a6bf0 100644 --- a/datafusion/spark/src/function/url/try_parse_url.rs +++ b/datafusion/spark/src/function/url/try_parse_url.rs @@ -22,7 +22,8 @@ use arrow::array::ArrayRef; use arrow::datatypes::DataType; use datafusion_common::Result; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, + Volatility, }; use datafusion_functions::utils::make_scalar_function; @@ -42,7 +43,10 @@ impl Default for TryParseUrl { impl TryParseUrl { pub fn new() -> Self { Self { - signature: Signature::user_defined(Volatility::Immutable), + signature: Signature::one_of( + vec![TypeSignature::String(2), TypeSignature::String(3)], + Volatility::Immutable, + ), } } } @@ -69,11 +73,6 @@ impl ScalarUDFImpl for TryParseUrl { let ScalarFunctionArgs { args, .. } = args; make_scalar_function(spark_try_parse_url, vec![])(&args) } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - let parse_url: ParseUrl = ParseUrl::new(); - parse_url.coerce_types(arg_types) - } } fn spark_try_parse_url(args: &[ArrayRef]) -> Result { From 47d9a21bed1114916747ef7edb7e225c47212147 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Mon, 15 Sep 2025 16:54:15 +0200 Subject: [PATCH 11/14] tests and clippy --- .../spark/src/function/url/parse_url.rs | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 6d151fbef8942..ac330b99f0a20 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -151,7 +151,7 @@ impl ScalarUDFImpl for ParseUrl { ); } match arg_types.len() { - 2 | 3 => { + 2 | 3 if arg_types.iter().all(is_string_type) => { if arg_types .iter() .any(|arg| matches!(arg, DataType::LargeUtf8)) @@ -166,6 +166,11 @@ impl ScalarUDFImpl for ParseUrl { Ok(DataType::Utf8) } } + 2 | 3 => plan_err!( + "`{}` expects STRING arguments, got {:?}", + &self.name(), + arg_types + ), _ => plan_err!( "`{}` expects 2 or 3 arguments, got {}", &self.name(), @@ -180,6 +185,13 @@ impl ScalarUDFImpl for ParseUrl { } } +fn is_string_type(dt: &DataType) -> bool { + matches!( + dt, + DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 + ) +} + /// Core implementation of URL parsing function. /// /// # Arguments @@ -316,17 +328,15 @@ where #[cfg(test)] mod tests { use super::*; - use arrow::array::{ArrayRef, Int32Array, LargeStringArray, StringArray}; + use arrow::array::{ArrayRef, Int32Array, StringArray}; use arrow::datatypes::DataType; use datafusion_common::Result; + use std::array::from_ref; use std::sync::Arc; fn sa(vals: &[Option<&str>]) -> ArrayRef { Arc::new(StringArray::from(vals.to_vec())) as ArrayRef } - fn lsa(vals: &[Option<&str>]) -> ArrayRef { - Arc::new(LargeStringArray::from(vals.to_vec())) as ArrayRef - } #[test] fn test_parse_host() -> Result<()> { @@ -420,18 +430,6 @@ mod tests { Ok(()) } - #[test] - fn test_spark_largeutf8_propagation() -> Result<()> { - let urls = lsa(&[Some("http://[2001:db8::2]:8080/index.html?ok=1")]); - let parts = sa(&[Some("HOST")]); - let out = spark_handled_parse_url(&[urls, parts], |x| x)?; - assert!(out.as_any().downcast_ref::().is_some()); - - let lsa_out = out.as_any().downcast_ref::().unwrap(); - assert_eq!(lsa_out.value(0), "[2001:db8::2]"); - Ok(()) - } - #[test] fn test_spark_userinfo_and_nulls() -> Result<()> { let urls = sa(&[ @@ -454,7 +452,7 @@ mod tests { #[test] fn test_invalid_arg_count() { let urls = sa(&[Some("https://example.com")]); - let err = spark_handled_parse_url(&[urls.clone()], |x| x).unwrap_err(); + let err = spark_handled_parse_url(from_ref(&urls), |x| x).unwrap_err(); assert!(format!("{err}").contains("expects 2 or 3 arguments")); let parts = sa(&[Some("HOST")]); @@ -482,13 +480,18 @@ mod tests { let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8])?; assert_eq!(rt, DataType::Utf8); - let rt = udf.return_type(&[DataType::LargeUtf8, DataType::Utf8])?; + let rt = udf.return_type(&[DataType::LargeUtf8, DataType::LargeUtf8])?; assert_eq!(rt, DataType::LargeUtf8); - let rt = - udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::LargeUtf8])?; + let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::Utf8])?; + assert_eq!(rt, DataType::Utf8); + + let rt = udf.return_type(&[DataType::LargeUtf8, DataType::Utf8])?; assert_eq!(rt, DataType::LargeUtf8); + let rt = udf.return_type(&[DataType::Utf8View, DataType::Utf8])?; + assert_eq!(rt, DataType::Utf8View); + let err = udf .return_type(&[DataType::Int32, DataType::Utf8]) .unwrap_err(); @@ -497,12 +500,6 @@ mod tests { let err = udf.return_type(&[DataType::Utf8]).unwrap_err(); assert!(format!("{err}").contains("expects 2 or 3 arguments")); - let ok = udf.coerce_types(&[DataType::Utf8, DataType::LargeUtf8])?; - assert_eq!(ok, vec![DataType::Utf8, DataType::LargeUtf8]); - - let err = udf.coerce_types(&[DataType::Utf8]).unwrap_err(); - assert!(format!("{err}").contains("expects 2 or 3 arguments")); - Ok(()) } } From 2e0cc6a7c693e6253ca7fdf40c00742f63fd8f84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Tue, 16 Sep 2025 13:01:28 +0200 Subject: [PATCH 12/14] suggestions and tests --- .../spark/src/function/url/parse_url.rs | 75 +------------------ .../test_files/spark/url/parse_url.slt | 4 - 2 files changed, 2 insertions(+), 77 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index ac330b99f0a20..1878a1b5d234e 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -26,7 +26,7 @@ use arrow::datatypes::DataType; use datafusion_common::cast::{ as_large_string_array, as_string_array, as_string_view_array, }; -use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; +use datafusion_common::{exec_datafusion_err, exec_err, Result}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility, @@ -143,40 +143,7 @@ impl ScalarUDFImpl for ParseUrl { } fn return_type(&self, arg_types: &[DataType]) -> Result { - if arg_types.len() < 2 || arg_types.len() > 3 { - return plan_err!( - "{} expects 2 or 3 arguments, but got {}", - self.name(), - arg_types.len() - ); - } - match arg_types.len() { - 2 | 3 if arg_types.iter().all(is_string_type) => { - if arg_types - .iter() - .any(|arg| matches!(arg, DataType::LargeUtf8)) - { - Ok(DataType::LargeUtf8) - } else if arg_types - .iter() - .any(|arg| matches!(arg, DataType::Utf8View)) - { - Ok(DataType::Utf8View) - } else { - Ok(DataType::Utf8) - } - } - 2 | 3 => plan_err!( - "`{}` expects STRING arguments, got {:?}", - &self.name(), - arg_types - ), - _ => plan_err!( - "`{}` expects 2 or 3 arguments, got {}", - &self.name(), - arg_types.len() - ), - } + Ok(arg_types[0].clone()) } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { @@ -185,13 +152,6 @@ impl ScalarUDFImpl for ParseUrl { } } -fn is_string_type(dt: &DataType) -> bool { - matches!( - dt, - DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 - ) -} - /// Core implementation of URL parsing function. /// /// # Arguments @@ -329,7 +289,6 @@ where mod tests { use super::*; use arrow::array::{ArrayRef, Int32Array, StringArray}; - use arrow::datatypes::DataType; use datafusion_common::Result; use std::array::from_ref; use std::sync::Arc; @@ -472,34 +431,4 @@ mod tests { let msg = format!("{err}"); assert!(msg.contains("expects STRING arguments")); } - - #[test] - fn test_return_type_and_coercion() -> Result<()> { - let udf = ParseUrl::new(); - - let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8])?; - assert_eq!(rt, DataType::Utf8); - - let rt = udf.return_type(&[DataType::LargeUtf8, DataType::LargeUtf8])?; - assert_eq!(rt, DataType::LargeUtf8); - - let rt = udf.return_type(&[DataType::Utf8, DataType::Utf8, DataType::Utf8])?; - assert_eq!(rt, DataType::Utf8); - - let rt = udf.return_type(&[DataType::LargeUtf8, DataType::Utf8])?; - assert_eq!(rt, DataType::LargeUtf8); - - let rt = udf.return_type(&[DataType::Utf8View, DataType::Utf8])?; - assert_eq!(rt, DataType::Utf8View); - - let err = udf - .return_type(&[DataType::Int32, DataType::Utf8]) - .unwrap_err(); - assert!(format!("{err}").contains("expects STRING arguments")); - - let err = udf.return_type(&[DataType::Utf8]).unwrap_err(); - assert!(format!("{err}").contains("expects 2 or 3 arguments")); - - Ok(()) - } } diff --git a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt index 7d8a8b6b7f9a9..28436021d0227 100644 --- a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt @@ -170,9 +170,5 @@ SELECT parse_url('https://ex.com/?Tag=ok', 'query', 'tag'); ---- NULL - -statement error parse_url expects 2 or 3 arguments, but got 1 -SELECT parse_url('http://userinfo@spark.apache.org/path?query=1#Ref'::string); - statement error 'parse_url' does not support zero arguments SELECT parse_url(); From 8626fdb84e5c5561cdc313bc1f3438a405deea5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Sun, 5 Oct 2025 10:11:56 +0200 Subject: [PATCH 13/14] fixing parse_url --- .../spark/src/function/url/parse_url.rs | 39 ++++++++++--------- .../test_files/spark/url/parse_url.slt | 3 ++ .../test_files/spark/url/try_parse_url.slt | 5 +++ 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 1878a1b5d234e..061d09a8fcaa5 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -32,7 +32,7 @@ use datafusion_expr::{ Volatility, }; use datafusion_functions::utils::make_scalar_function; -use url::Url; +use url::{ParseError, Url}; #[derive(Debug, PartialEq, Eq, Hash)] pub struct ParseUrl { @@ -82,10 +82,20 @@ impl ParseUrl { /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed /// fn parse(value: &str, part: &str, key: Option<&str>) -> Result> { - Url::parse(value) - .map_err(|e| exec_datafusion_err!("{e:?}")) + let url: std::result::Result = Url::parse(value); + if let Err(ParseError::RelativeUrlWithoutBase) = url { + return if !value.contains("://") { + Ok(None) + } else { + Err(exec_datafusion_err!("The url is invalid: {value}. Use `try_parse_url` to tolerate invalid URL and return NULL instead. SQLSTATE: 22P02")) + }; + }; + url.map_err(|e| exec_datafusion_err!("{e:?}")) .map(|url| match part { - "HOST" => url.host_str().map(String::from), + "HOST" => { + 0; + url.host_str().map(String::from) + } "PATH" => { let path: String = url.path().to_string(); let path: String = if path == "/" { "".to_string() } else { path }; @@ -107,13 +117,7 @@ impl ParseUrl { None => Some(path.to_string()), } } - "AUTHORITY" => { - let authority: String = url.authority().to_string(); - match (url.port(), url.port_or_known_default()) { - (None, Some(port)) => Some(format!("{authority}:{port}")), - _ => Some(authority), - } - } + "AUTHORITY" => Some(url.authority().to_string()), "USERINFO" => { let username = url.username(); if username.is_empty() { @@ -335,7 +339,7 @@ mod tests { ); assert_eq!( ParseUrl::parse(url, "AUTHORITY", None)?, - Some("user:pwd@ftp.example.com:21".to_string()) + Some("user:pwd@ftp.example.com".to_string()) ); Ok(()) } @@ -348,13 +352,10 @@ mod tests { } #[test] - fn test_parse_malformed_url_returns_error() { - let err = ParseUrl::parse("notaurl", "HOST", None).unwrap_err(); - let msg = format!("{err}"); - assert!( - msg.contains("DataFusion") || msg.contains("error"), - "msg was: {msg}" - ); + fn test_parse_malformed_url_returns_error() -> Result<()> { + let got = ParseUrl::parse("notaurl", "HOST", None)?; + assert_eq!(got, None); + Ok(()) } #[test] diff --git a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt index 28436021d0227..f2dc55f75598a 100644 --- a/datafusion/sqllogictest/test_files/spark/url/parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/parse_url.slt @@ -172,3 +172,6 @@ NULL statement error 'parse_url' does not support zero arguments SELECT parse_url(); + +query error DataFusion error: Execution error: The url is invalid: inva lid://spark\.apache\.org/path\?query=1\. Use `try_parse_url` to tolerate invalid URL and return NULL instead\. SQLSTATE: 22P02 +SELECT parse_url('inva lid://spark.apache.org/path?query=1', 'QUERY'); diff --git a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt index f541f8eedc0bf..403747c63c77c 100644 --- a/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt +++ b/datafusion/sqllogictest/test_files/spark/url/try_parse_url.slt @@ -120,3 +120,8 @@ query T SELECT try_parse_url('https://ex.com/?Tag=ok', 'query', 'tag'); ---- NULL + +query T +SELECT try_parse_url('inva lid://spark.apache.org/path?query=1', 'QUERY'); +---- +NULL From 80e2dbcd9766da3be58a16e4b5d27899e2f98c76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Fern=C3=A1ndez?= Date: Sun, 5 Oct 2025 10:15:47 +0200 Subject: [PATCH 14/14] fixing parse_url --- datafusion/spark/src/function/url/parse_url.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/url/parse_url.rs b/datafusion/spark/src/function/url/parse_url.rs index 061d09a8fcaa5..d93c260b4f340 100644 --- a/datafusion/spark/src/function/url/parse_url.rs +++ b/datafusion/spark/src/function/url/parse_url.rs @@ -92,10 +92,7 @@ impl ParseUrl { }; url.map_err(|e| exec_datafusion_err!("{e:?}")) .map(|url| match part { - "HOST" => { - 0; - url.host_str().map(String::from) - } + "HOST" => url.host_str().map(String::from), "PATH" => { let path: String = url.path().to_string(); let path: String = if path == "/" { "".to_string() } else { path };