Skip to content
15 changes: 12 additions & 3 deletions docs/source/contributor-guide/spark_expressions_support.md
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,18 @@

### url_funcs

- [ ] parse_url
- [ ] url_decode
- [ ] url_encode
- [x] parse_url
- 3.4.3, 2026-04-29
- 3.5.8, 2026-04-29
- 4.0.1, 2026-04-29: marked Incompatible. Comet tracks the work at https://github.com/apache/datafusion-comet/issues/4156, with the divergences enumerated upstream at https://github.com/apache/datafusion/issues/21943.
- [x] url_decode
- 3.4.3, 2026-04-29
- 3.5.8, 2026-04-29
- 4.0.1, 2026-04-29: `try_url_decode` is not handled correctly. Spark rewrites `try_url_decode(x)` to `StaticInvoke(UrlCodec, "decode", [x, Literal(false)], ...)`, but `CometUrlDecodeStaticInvoke` drops the `failOnError=false` flag and emits a plain `url_decode(x)`, so Comet errors on malformed input where Spark returns NULL. Tracked at https://github.com/apache/datafusion-comet/issues/4155.
- [x] url_encode
- 3.4.3, 2026-04-29
- 3.5.8, 2026-04-29
- 4.0.1, 2026-04-29: replacement StaticInvoke is single-argument and inputs are collation-aware, but encoded output matches 3.4/3.5. No version-specific shim required.

### window_funcs

Expand Down
8 changes: 8 additions & 0 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ use datafusion_spark::function::string::char::CharFunc;
use datafusion_spark::function::string::concat::SparkConcat;
use datafusion_spark::function::string::luhn_check::SparkLuhnCheck;
use datafusion_spark::function::string::space::SparkSpace;
use datafusion_spark::function::url::parse_url::ParseUrl as SparkParseUrl;
use datafusion_spark::function::url::try_parse_url::TryParseUrl as SparkTryParseUrl;
use datafusion_spark::function::url::url_decode::UrlDecode as SparkUrlDecode;
use datafusion_spark::function::url::url_encode::UrlEncode as SparkUrlEncode;
use futures::poll;
use futures::stream::StreamExt;
use futures::FutureExt;
Expand Down Expand Up @@ -567,6 +571,10 @@ fn register_datafusion_spark_function(session_ctx: &SessionContext) {
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkArrayContains::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkBin::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkStrToMap::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkParseUrl::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkTryParseUrl::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlDecode::default()));
session_ctx.register_udf(ScalarUDF::new_from_impl(SparkUrlEncode::default()));
}

/// Prepares arrow arrays for output.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
private val conversionExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[Cast] -> CometCast)

private val urlExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[ParseUrl] -> CometParseUrl)

private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
// TODO PromotePrecision
classOf[Alias] -> CometAlias,
Expand All @@ -257,7 +260,7 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
mathExpressions ++ hashExpressions ++ stringExpressions ++
conditionalExpressions ++ mapExpressions ++ predicateExpressions ++
structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++
temporalExpressions ++ conversionExpressions
temporalExpressions ++ conversionExpressions ++ urlExpressions

/**
* Mapping of Spark aggregate expression class to Comet expression handler.
Expand Down
31 changes: 29 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/statics.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionImplUtils, UrlCodec}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils

import org.apache.comet.CometSparkSessionExtensions.withInfo
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}

object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] {

Expand All @@ -35,7 +36,9 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] {
Map(
("readSidePadding", classOf[CharVarcharCodegenUtils]) -> CometScalarFunction(
"read_side_padding"),
("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check"))
("isLuhnNumber", classOf[ExpressionImplUtils]) -> CometScalarFunction("luhn_check"),
("encode", UrlCodec.getClass) -> CometUrlEncodeStaticInvoke,
("decode", UrlCodec.getClass) -> CometUrlDecodeStaticInvoke)

override def convert(
expr: StaticInvoke,
Expand All @@ -53,3 +56,27 @@ object CometStaticInvoke extends CometExpressionSerde[StaticInvoke] {
}
}
}

// UrlCodec.encode(child, "UTF-8") -> url_encode(child)
object CometUrlEncodeStaticInvoke extends CometExpressionSerde[StaticInvoke] {
override def convert(
expr: StaticInvoke,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(expr.children.head, inputs, binding)
val optExpr = scalarFunctionExprToProto("url_encode", childExpr)
optExprWithInfo(optExpr, expr, expr.children: _*)
}
}

// UrlCodec.decode(child, "UTF-8") -> url_decode(child)
object CometUrlDecodeStaticInvoke extends CometExpressionSerde[StaticInvoke] {
override def convert(
expr: StaticInvoke,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val childExpr = exprToProtoInternal(expr.children.head, inputs, binding)
val optExpr = scalarFunctionExprToProto("url_decode", childExpr)
optExprWithInfo(optExpr, expr, expr.children: _*)
}
}
48 changes: 48 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/url.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{Attribute, ParseUrl}

import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithInfo, scalarFunctionExprToProto}

object CometParseUrl extends CometExpressionSerde[ParseUrl] {

// The full list of edge-case divergences is tracked at
// https://github.com/apache/datafusion/issues/21943.
private val incompatibleReason =
"Native parse_url diverges from Spark on several edge cases. " +
"See https://github.com/apache/datafusion/issues/21943."

override def getIncompatibleReasons(): Seq[String] = Seq(incompatibleReason)

override def getSupportLevel(expr: ParseUrl): SupportLevel =
Incompatible(Some(incompatibleReason))

override def convert(
expr: ParseUrl,
inputs: Seq[Attribute],
binding: Boolean): Option[ExprOuterClass.Expr] = {
val funcName = if (expr.failOnError) "parse_url" else "try_parse_url"
val childExprs = expr.children.map(exprToProtoInternal(_, inputs, binding))
val optExpr = scalarFunctionExprToProto(funcName, childExprs: _*)
optExprWithInfo(optExpr, expr, expr.children: _*)
}
}
37 changes: 37 additions & 0 deletions spark/src/test/resources/sql-tests/expressions/url/parse_url.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- parse_url is marked Incompatible (see CometParseUrl). Known divergences from
-- Spark, tracked upstream at https://github.com/apache/datafusion/issues/21943:
-- 1. empty-string URL returns NULL instead of "" for any part
-- 2. FILE on a URL without an explicit path returns "/?..." instead of "?..."
-- 3. PATH on a URL with a bare trailing slash returns "" instead of "/"
-- In the default configuration, Comet falls back to Spark. The two queries below
-- both verify a normal-shape URL takes the fallback path, and exercise one of
-- the divergent shapes (trailing-slash PATH) to lock in that fallback handles
-- it correctly. See parse_url_native.sql for native-execution coverage.

query expect_fallback(not fully compatible with Spark)
SELECT parse_url('http://spark.apache.org/path?query=1', 'HOST')

query expect_fallback(not fully compatible with Spark)
SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY', 'query')

-- Trailing-slash PATH: Spark returns "/", native impl returns "". Verifying
-- the fallback path emits Spark's "/".
query expect_fallback(not fully compatible with Spark)
SELECT parse_url('http://example.com/', 'PATH')
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- Exercises the native parse_url implementation. Inputs are restricted to
-- URLs with explicit paths because the native implementation diverges from
-- Spark for empty-string input and for FILE extraction on path-less URLs.
-- Tracked upstream at https://github.com/apache/datafusion/issues/21943.

-- Config: spark.comet.expression.ParseUrl.allowIncompatible=true
-- Config: spark.sql.ansi.enabled=true

statement
CREATE TABLE test_urls_native(url string) USING parquet

statement
INSERT INTO test_urls_native VALUES
('http://spark.apache.org/path?query=1'),
('http://user:password@host:8080/path?key=value&key2=value2#ref'),
('http://example.com/path'),
(NULL)

query
SELECT parse_url(url, 'HOST') FROM test_urls_native

query
SELECT parse_url(url, 'PATH') FROM test_urls_native

query
SELECT parse_url(url, 'QUERY') FROM test_urls_native

query
SELECT parse_url(url, 'REF') FROM test_urls_native

query
SELECT parse_url(url, 'PROTOCOL') FROM test_urls_native

query
SELECT parse_url(url, 'FILE') FROM test_urls_native

query
SELECT parse_url(url, 'AUTHORITY') FROM test_urls_native

query
SELECT parse_url(url, 'USERINFO') FROM test_urls_native

query
SELECT parse_url(url, 'QUERY', 'query') FROM test_urls_native

query
SELECT parse_url(url, 'QUERY', 'key') FROM test_urls_native

query
SELECT parse_url(url, 'QUERY', 'key2') FROM test_urls_native

query
SELECT parse_url(url, 'QUERY', 'nonexistent') FROM test_urls_native

query
SELECT parse_url('http://spark.apache.org/path?query=1', 'HOST')

query
SELECT parse_url('http://spark.apache.org/path?query=1', 'QUERY', 'query')

query
SELECT parse_url(NULL, 'HOST')

-- ANSI-mode invalid URL: parse_url's failOnError is driven by spark.sql.ansi.enabled
-- (set above). Both Spark (INVALID_URL error class) and Comet's native impl
-- produce a message starting "The url is invalid".
query expect_error(The url is invalid)
SELECT parse_url('inva lid://user:pass@host/file', 'HOST')
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- try_parse_url is Spark 4.0+. It rewrites to ParseUrl(_, failOnError=false),
-- so Comet emits the native try_parse_url scalar function. parse_url is marked
-- Incompatible (see CometParseUrl), so this test opts in via allowIncompatible.

-- MinSparkVersion: 4.0
-- Config: spark.comet.expression.ParseUrl.allowIncompatible=true

-- Valid URL: same answer as parse_url.
query
SELECT try_parse_url('http://spark.apache.org/path?query=1', 'HOST')

query
SELECT try_parse_url('http://spark.apache.org/path?query=1', 'QUERY', 'query')

-- Malformed URL with a scheme: Spark returns NULL, Comet's try_parse_url
-- returns NULL (failOnError=false propagates through CometParseUrl to the
-- native try_parse_url UDF).
query
SELECT try_parse_url('inva lid://user:pass@host/file', 'HOST')

-- NULL input.
query
SELECT try_parse_url(NULL, 'HOST')
65 changes: 65 additions & 0 deletions spark/src/test/resources/sql-tests/expressions/url/url_decode.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- url_decode function
statement
CREATE TABLE test_decode(s string) USING parquet

statement
INSERT INTO test_decode VALUES
('https%3A%2F%2Fspark.apache.org'),
('hello+world'),
('a%2Bb%3Dc%26d%3De'),
('caf%C3%A9'),
(''),
(NULL),
('no+encoding+needed'),
('%21%40%23%24%25%5E%26%2A%28%29%5F%2B'),
('%2a%2b%2c')

query
SELECT url_decode(s) FROM test_decode

-- literal arguments
query
SELECT url_decode('https%3A%2F%2Fspark.apache.org')

query
SELECT url_decode('hello+world')

query
SELECT url_decode('')

query
SELECT url_decode(NULL)

-- roundtrip: encode then decode
query
SELECT url_decode(url_encode('hello world & goodbye'))

-- multibyte UTF-8
query
SELECT url_decode('%E6%97%A5%E6%9C%AC%E8%AA%9E%E3%83%86%E3%82%B9%E3%83%88')

-- lowercase hex (RFC 3986 says hex digits are case-insensitive)
query
SELECT url_decode('%2a%2b%2c')

-- malformed percent-encoding: both Spark and Comet must error and the bad
-- sequence must appear in the error message
query expect_error(%2s)
SELECT url_decode('http%3A%2F%2spark.apache.org')
Loading
Loading