From cea76d2d6460dc2a68dc7ff4a889e49d080a9d45 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:04:29 +0400 Subject: [PATCH 01/11] feat: Add support for base64 padded encoding and decoding --- datafusion/functions/src/encoding/inner.rs | 49 +++++++----- .../sqllogictest/test_files/encoding.slt | 77 ++++++++++++++++--- 2 files changed, 93 insertions(+), 33 deletions(-) diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index ce7f534506d61..baa59b6f85d8a 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -52,6 +52,12 @@ const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new( .with_decode_padding_mode(DecodePaddingMode::Indifferent), ); +// Generate padding characters when encoding +const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new( + &base64::alphabet::STANDARD, + GeneralPurposeConfig::new().with_encode_padding(true), +); + #[user_doc( doc_section(label = "Binary String Functions"), description = "Encode binary data into a textual representation.", @@ -62,7 +68,7 @@ const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new( ), argument( name = "format", - description = "Supported formats are: `base64`, `hex`" + description = "Supported formats are: `base64`, `base64pad`, `hex`" ), related_udf(name = "decode") )] @@ -319,12 +325,18 @@ fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result { #[derive(Debug, Copy, Clone)] enum Encoding { Base64, + Base64Padded, Hex, } impl fmt::Display for Encoding { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", format!("{self:?}").to_lowercase()) + let name = match self { + Self::Base64 => "base64", + Self::Base64Padded => "base64pad", + Self::Hex => "hex", + }; + write!(f, "{name}") } } @@ -345,9 +357,10 @@ impl TryFrom<&ColumnarValue> for Encoding { }; match encoding { "base64" => Ok(Self::Base64), + "base64pad" => Ok(Self::Base64Padded), "hex" => Ok(Self::Hex), _ => { - let options = [Self::Base64, Self::Hex] + let options = [Self::Base64, Self::Base64Padded, Self::Hex] .iter() .map(|i| i.to_string()) .collect::>() @@ -364,15 +377,18 @@ impl Encoding { fn encode_bytes(self, value: &[u8]) -> String { match self { Self::Base64 => BASE64_ENGINE.encode(value), + Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value), Self::Hex => hex::encode(value), } } fn decode_bytes(self, value: &[u8]) -> Result> { match self { - Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| { - exec_datafusion_err!("Failed to decode value using base64: {e}") - }), + Self::Base64 | Self::Base64Padded => { + BASE64_ENGINE.decode(value).map_err(|e| { + exec_datafusion_err!("Failed to decode value using {self}: {e}") + }) + } Self::Hex => hex::decode(value).map_err(|e| { exec_datafusion_err!("Failed to decode value using hex: {e}") }), @@ -388,20 +404,11 @@ impl Encoding { InputBinaryArray: BinaryArrayType<'a>, OutputOffset: OffsetSizeTrait, { - match self { - Self::Base64 => { - let array: GenericStringArray = array - .iter() - .map(|x| x.map(|x| BASE64_ENGINE.encode(x))) - .collect(); - Ok(Arc::new(array)) - } - Self::Hex => { - let array: GenericStringArray = - array.iter().map(|x| x.map(hex::encode)).collect(); - Ok(Arc::new(array)) - } - } + let array: GenericStringArray = array + .iter() + .map(|x| x.map(|x| self.encode_bytes(x))) + .collect(); + Ok(Arc::new(array)) } // OutputOffset important to ensure Large types output Large arrays @@ -430,7 +437,7 @@ impl Encoding { } match self { - Self::Base64 => { + Self::Base64 | Self::Base64Padded => { let upper_bound = base64::decoded_len_estimate(approx_data_size); delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound) } diff --git a/datafusion/sqllogictest/test_files/encoding.slt b/datafusion/sqllogictest/test_files/encoding.slt index 1b1acbc385348..b04d5061825b4 100644 --- a/datafusion/sqllogictest/test_files/encoding.slt +++ b/datafusion/sqllogictest/test_files/encoding.slt @@ -20,21 +20,41 @@ SELECT encode(arrow_cast('tom', 'Utf8View'),'base64'); ---- dG9t +query T +SELECT encode(arrow_cast('tommy', 'Utf8View'),'base64pad'); +---- +dG9tbXk= + query T SELECT arrow_cast(decode(arrow_cast('dG9t', 'Utf8View'),'base64'), 'Utf8'); ---- tom +query T +SELECT arrow_cast(decode(arrow_cast('dG9tbXk=', 'Utf8View'),'base64pad'), 'Utf8'); +---- +tommy + query T SELECT encode(arrow_cast('tom', 'BinaryView'),'base64'); ---- dG9t +query T +SELECT encode(arrow_cast('tommy', 'BinaryView'),'base64pad'); +---- +dG9tbXk= + query T SELECT arrow_cast(decode(arrow_cast('dG9t', 'BinaryView'),'base64'), 'Utf8'); ---- tom +query T +SELECT arrow_cast(decode(arrow_cast('dG9tbXk=', 'BinaryView'),'base64pad'), 'Utf8'); +---- +tommy + # test for hex digest query T select encode(digest('hello', 'sha256'), 'hex'); @@ -61,10 +81,10 @@ select encode(12, 'hex'); query error DataFusion error: Error during planning: Function 'decode' requires TypeSignatureClass::Binary, but received Int64 \(DataType: Int64\) select decode(12, 'hex'); -query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, hex +query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, base64pad, hex select encode('', 'non_encoding'); -query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, hex +query error DataFusion error: Error during planning: There is no built\-in encoding named 'non_encoding', currently supported encodings are: base64, base64pad, hex select decode('', 'non_encoding'); query error DataFusion error: Execution error: Encoding must be a non-null string @@ -124,11 +144,21 @@ select encode(bin_field, 'base64') FROM test WHERE num = 3; ---- j1DT9g6uNw3b+FyGIZxVEIo1AWU +query T +select encode(bin_field, 'base64pad') FROM test WHERE num = 3; +---- +j1DT9g6uNw3b+FyGIZxVEIo1AWU= + query B select decode(encode(bin_field, 'base64'), 'base64') = X'8f50d3f60eae370ddbf85c86219c55108a350165' FROM test WHERE num = 3; ---- true +query B +select decode(encode(bin_field, 'base64pad'), 'base64pad') = X'8f50d3f60eae370ddbf85c86219c55108a350165' FROM test WHERE num = 3; +---- +true + statement ok drop table test @@ -144,18 +174,20 @@ FROM VALUES ('Raphael', 'R'), (NULL, 'R'); -query TTTT +query TTTTTT SELECT encode(column1_utf8view, 'base64') AS column1_base64, + encode(column1_utf8view, 'base64pad') AS column1_base64pad, encode(column1_utf8view, 'hex') AS column1_hex, encode(column2_utf8view, 'base64') AS column2_base64, + encode(column2_utf8view, 'base64pad') AS column2_base64pad, encode(column2_utf8view, 'hex') AS column2_hex FROM test_utf8view; ---- -QW5kcmV3 416e64726577 WA 58 -WGlhbmdwZW5n 5869616e6770656e67 WGlhbmdwZW5n 5869616e6770656e67 -UmFwaGFlbA 5261706861656c Ug 52 -NULL NULL Ug 52 +QW5kcmV3 QW5kcmV3 416e64726577 WA WA== 58 +WGlhbmdwZW5n WGlhbmdwZW5n 5869616e6770656e67 WGlhbmdwZW5n WGlhbmdwZW5n 5869616e6770656e67 +UmFwaGFlbA UmFwaGFlbA== 5261706861656c Ug Ug== 52 +NULL NULL NULL Ug Ug== 52 query TTTTTT SELECT @@ -172,6 +204,22 @@ WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA UmFwaGFlbA NULL NULL NULL NULL NULL NULL + +query TTTTTT +SELECT + encode(arrow_cast(column1_utf8view, 'Utf8'), 'base64pad'), + encode(arrow_cast(column1_utf8view, 'LargeUtf8'), 'base64pad'), + encode(arrow_cast(column1_utf8view, 'Utf8View'), 'base64pad'), + encode(arrow_cast(column1_utf8view, 'Binary'), 'base64pad'), + encode(arrow_cast(column1_utf8view, 'LargeBinary'), 'base64pad'), + encode(arrow_cast(column1_utf8view, 'BinaryView'), 'base64pad') +FROM test_utf8view; +---- +QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 QW5kcmV3 +WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n WGlhbmdwZW5n +UmFwaGFlbA== UmFwaGFlbA== UmFwaGFlbA== UmFwaGFlbA== UmFwaGFlbA== UmFwaGFlbA== +NULL NULL NULL NULL NULL NULL + statement ok drop table test_utf8view @@ -180,26 +228,31 @@ statement ok CREATE TABLE test_fsb AS SELECT arrow_cast(X'0123456789ABCDEF', 'FixedSizeBinary(8)') as fsb_col; -query ?? +query ??? SELECT decode(encode(arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)'), 'base64'), 'base64'), + decode(encode(arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)'), 'base64pad'), 'base64pad'), decode(encode(arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)'), 'hex'), 'hex'); ---- -0123456789abcdef 0123456789abcdef +0123456789abcdef 0123456789abcdef 0123456789abcdef -query ?? +query ??? SELECT decode(encode(column1, 'base64'), 'base64'), + decode(encode(column1, 'base64pad'), 'base64pad'), decode(encode(column1, 'hex'), 'hex') FROM values (arrow_cast(X'0123456789abcdef', 'FixedSizeBinary(8)')), (arrow_cast(X'ffffffffffffffff', 'FixedSizeBinary(8)')); ---- -0123456789abcdef 0123456789abcdef -ffffffffffffffff ffffffffffffffff +0123456789abcdef 0123456789abcdef 0123456789abcdef +ffffffffffffffff ffffffffffffffff ffffffffffffffff query error DataFusion error: Execution error: Failed to decode value using base64 select decode('invalid', 'base64'); +query error DataFusion error: Execution error: Failed to decode value using base64pad +select decode('invalid', 'base64pad'); + query error DataFusion error: Execution error: Failed to decode value using hex select decode('invalid', 'hex'); From d564f7761faa58de13a61a150713c2f8709cc644 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:04:38 +0400 Subject: [PATCH 02/11] fix: Update benchmark to use binary array for base64 and hex decoding --- datafusion/functions/benches/encoding.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/datafusion/functions/benches/encoding.rs b/datafusion/functions/benches/encoding.rs index 8a7c2b7b664b7..af0385b6fc90d 100644 --- a/datafusion/functions/benches/encoding.rs +++ b/datafusion/functions/benches/encoding.rs @@ -19,7 +19,7 @@ extern crate criterion; use arrow::array::Array; use arrow::datatypes::{DataType, Field}; -use arrow::util::bench_util::create_string_array_with_len; +use arrow::util::bench_util::create_binary_array; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_common::config::ConfigOptions; use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; @@ -32,20 +32,22 @@ fn criterion_benchmark(c: &mut Criterion) { let config_options = Arc::new(ConfigOptions::default()); for size in [1024, 4096, 8192] { - let str_array = Arc::new(create_string_array_with_len::(size, 0.2, 32)); + let bin_array = Arc::new(create_binary_array::(size, 0.2)); c.bench_function(&format!("base64_decode/{size}"), |b| { let method = ColumnarValue::Scalar("base64".into()); let encoded = encoding::encode() .invoke_with_args(ScalarFunctionArgs { - args: vec![ColumnarValue::Array(str_array.clone()), method.clone()], + args: vec![ColumnarValue::Array(bin_array.clone()), method.clone()], arg_fields: vec![ - Field::new("a", str_array.data_type().to_owned(), true).into(), + Field::new("a", bin_array.data_type().to_owned(), true).into(), Field::new("b", method.data_type().to_owned(), true).into(), ], number_rows: size, return_field: Field::new("f", DataType::Utf8, true).into(), config_options: Arc::clone(&config_options), }) + .unwrap() + .cast_to(&DataType::Binary, None) .unwrap(); let arg_fields = vec![ @@ -61,7 +63,7 @@ fn criterion_benchmark(c: &mut Criterion) { args: args.clone(), arg_fields: arg_fields.clone(), number_rows: size, - return_field: Field::new("f", DataType::Utf8, true).into(), + return_field: Field::new("f", DataType::Binary, true).into(), config_options: Arc::clone(&config_options), }) .unwrap(), @@ -72,24 +74,26 @@ fn criterion_benchmark(c: &mut Criterion) { c.bench_function(&format!("hex_decode/{size}"), |b| { let method = ColumnarValue::Scalar("hex".into()); let arg_fields = vec![ - Field::new("a", str_array.data_type().to_owned(), true).into(), + Field::new("a", bin_array.data_type().to_owned(), true).into(), Field::new("b", method.data_type().to_owned(), true).into(), ]; let encoded = encoding::encode() .invoke_with_args(ScalarFunctionArgs { - args: vec![ColumnarValue::Array(str_array.clone()), method.clone()], + args: vec![ColumnarValue::Array(bin_array.clone()), method.clone()], arg_fields, number_rows: size, return_field: Field::new("f", DataType::Utf8, true).into(), config_options: Arc::clone(&config_options), }) + .unwrap() + .cast_to(&DataType::Binary, None) .unwrap(); let arg_fields = vec![ Field::new("a", encoded.data_type().to_owned(), true).into(), Field::new("b", method.data_type().to_owned(), true).into(), ]; - let return_field = Field::new("f", DataType::Utf8, true).into(); + let return_field = Field::new("f", DataType::Binary, true).into(); let args = vec![encoded, method]; b.iter(|| { From e6794abe8f52ce60e1286f565ca31762c74e567c Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:05:33 +0400 Subject: [PATCH 03/11] feat(spark): add base64 functions --- .../spark/src/function/string/base64.rs | 181 ++++++++++++++++++ datafusion/spark/src/function/string/mod.rs | 15 ++ .../test_files/spark/string/base64.slt | 113 +++++++++-- .../test_files/spark/string/unbase64.slt | 27 --- 4 files changed, 294 insertions(+), 42 deletions(-) create mode 100644 datafusion/spark/src/function/string/base64.rs delete mode 100644 datafusion/sqllogictest/test_files/spark/string/unbase64.slt diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs new file mode 100644 index 0000000000000..fb398f6f01777 --- /dev/null +++ b/datafusion/spark/src/function/string/base64.rs @@ -0,0 +1,181 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::datatypes::DataType; +use datafusion_common::arrow::datatypes::{Field, FieldRef}; +use datafusion_common::types::{NativeType, logical_string}; +use datafusion_common::utils::take_function_args; +use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; +use datafusion_expr::{ + Coercion, Expr, ExprSchemable, ReturnFieldArgs, TypeSignatureClass, +}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use datafusion_functions::expr_fn::{decode, encode}; + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkBase64 { + signature: Signature, +} + +impl Default for SparkBase64 { + fn default() -> Self { + Self::new() + } +} + +impl SparkBase64 { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_implicit( + TypeSignatureClass::Binary, + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Binary, + )], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkBase64 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "base64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_type should not be called for {}", self.name()) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result { + let [bin] = take_function_args(self.name(), args.arg_fields)?; + let return_type = match bin.data_type() { + DataType::LargeBinary => DataType::LargeUtf8, + _ => DataType::Utf8, + }; + Ok(Arc::new(Field::new( + self.name(), + return_type, + bin.is_nullable(), + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + exec_err!("{} should have been simplified", self.name()) + } + + fn simplify( + &self, + args: Vec, + _info: &SimplifyContext, + ) -> Result { + let [bin] = take_function_args(self.name(), args)?; + Ok(ExprSimplifyResult::Simplified(encode( + bin, + Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None), + ))) + } +} + +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkUnBase64 { + signature: Signature, +} + +impl Default for SparkUnBase64 { + fn default() -> Self { + Self::new() + } +} + +impl SparkUnBase64 { + pub fn new() -> Self { + Self { + signature: Signature::coercible( + vec![Coercion::new_implicit( + TypeSignatureClass::Native(logical_string()), + vec![TypeSignatureClass::Binary], + NativeType::String, + )], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkUnBase64 { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unbase64" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!("return_type should not be called for {}", self.name()) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result { + let [str] = take_function_args(self.name(), args.arg_fields)?; + let return_type = match str.data_type() { + DataType::LargeUtf8 => DataType::LargeBinary, + _ => DataType::Binary, + }; + Ok(Arc::new(Field::new( + self.name(), + return_type, + str.is_nullable(), + ))) + } + + fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { + exec_err!("{} should have been simplified", self.name()) + } + + fn simplify( + &self, + args: Vec, + info: &SimplifyContext, + ) -> Result { + let [str] = take_function_args(self.name(), args)?; + Ok(ExprSimplifyResult::Simplified(decode( + str.cast_to(&DataType::Binary, info.schema())?, + Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None), + ))) + } +} diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index 1f0108cf509c7..8859beca77996 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod ascii; +pub mod base64; pub mod char; pub mod concat; pub mod elt; @@ -32,6 +33,7 @@ use datafusion_functions::make_udf_function; use std::sync::Arc; make_udf_function!(ascii::SparkAscii, ascii); +make_udf_function!(base64::SparkBase64, base64); make_udf_function!(char::CharFunc, char); make_udf_function!(concat::SparkConcat, concat); make_udf_function!(ilike::SparkILike, ilike); @@ -42,6 +44,7 @@ make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check); make_udf_function!(format_string::FormatStringFunc, format_string); make_udf_function!(space::SparkSpace, space); make_udf_function!(substring::SparkSubstring, substring); +make_udf_function!(base64::SparkUnBase64, unbase64); pub mod expr_fn { use datafusion_functions::export_functions; @@ -51,6 +54,11 @@ pub mod expr_fn { "Returns the ASCII code point of the first character of string.", arg1 )); + export_functions!(( + base64, + "Encodes the input binary `bin` into a base64 string.", + bin + )); export_functions!(( char, "Returns the ASCII character having the binary equivalent to col. If col is larger than 256 the result is equivalent to char(col % 256).", @@ -97,11 +105,17 @@ pub mod expr_fn { "Returns the substring from string `str` starting at position `pos` with length `length.", str pos length )); + export_functions!(( + unbase64, + "Decodes the input string `str` from a base64 string into binary data.", + str + )); } pub fn functions() -> Vec> { vec![ ascii(), + base64(), char(), concat(), elt(), @@ -112,5 +126,6 @@ pub fn functions() -> Vec> { format_string(), space(), substring(), + unbase64(), ] } diff --git a/datafusion/sqllogictest/test_files/spark/string/base64.slt b/datafusion/sqllogictest/test_files/spark/string/base64.slt index 66edbe8442158..365ff4e313677 100644 --- a/datafusion/sqllogictest/test_files/spark/string/base64.slt +++ b/datafusion/sqllogictest/test_files/spark/string/base64.slt @@ -15,18 +15,101 @@ # specific language governing permissions and limitations # under the License. -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT base64('Spark SQL'); -## PySpark 3.5.5 Result: {'base64(Spark SQL)': 'U3BhcmsgU1FM', 'typeof(base64(Spark SQL))': 'string', 'typeof(Spark SQL)': 'string'} -#query -#SELECT base64('Spark SQL'::string); - -## Original Query: SELECT base64(x'537061726b2053514c'); -## PySpark 3.5.5 Result: {"base64(X'537061726B2053514C')": 'U3BhcmsgU1FM', "typeof(base64(X'537061726B2053514C'))": 'string', "typeof(X'537061726B2053514C')": 'binary'} -#query -#SELECT base64(X'537061726B2053514C'::binary); +query T +SELECT base64('Spark SQL'::string); +---- +U3BhcmsgU1FM + +query T +SELECT base64('Spark SQ'::string); +---- +U3BhcmsgU1E= + +query T +SELECT base64('Spark S'::string); +---- +U3BhcmsgUw== + +query T +SELECT base64('Spark SQL'::bytea); +---- +U3BhcmsgU1FM + +query T +SELECT base64(NULL::string); +---- +NULL + +query T +SELECT base64(NULL::bytea); +---- +NULL + +query T +SELECT base64(column1) +FROM VALUES +('Spark SQL'::bytea), +('Spark SQ'::bytea), +('Spark S'::bytea), +(NULL::bytea); +---- +U3BhcmsgU1FM +U3BhcmsgU1E= +U3BhcmsgUw== +NULL + +query error Function 'base64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'base64\(Int32\)'. +SELECT base64(12::integer); + + +query T +SELECT arrow_cast(unbase64('U3BhcmsgU1FM'::string), 'Utf8'); +---- +Spark SQL + +query T +SELECT arrow_cast(unbase64('U3BhcmsgU1E='::string), 'Utf8'); +---- +Spark SQ + +query T +SELECT arrow_cast(unbase64('U3BhcmsgUw=='::string), 'Utf8'); +---- +Spark S + +query T +SELECT arrow_cast(unbase64('U3BhcmsgU1FM'::bytea), 'Utf8'); +---- +Spark SQL + +query ? +SELECT unbase64(NULL::string); +---- +NULL + +query ? +SELECT unbase64(NULL::bytea); +---- +NULL + +query T +SELECT arrow_cast(unbase64(column1), 'Utf8') +FROM VALUES +('U3BhcmsgU1FM'::string), +('U3BhcmsgU1E='::string), +('U3BhcmsgUw=='::string), +(NULL::string); +---- +Spark SQL +Spark SQ +Spark S +NULL + +query error Failed to decode value using base64 +SELECT unbase64('123'::string); + +query error Failed to decode value using base64 +SELECT unbase64('123'::bytea); + +query error Function 'unbase64' requires TypeSignatureClass::Native\(LogicalType\(Native\(String\), String\)\), but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'unbase64\(Int32\)'. +SELECT unbase64(12::integer); diff --git a/datafusion/sqllogictest/test_files/spark/string/unbase64.slt b/datafusion/sqllogictest/test_files/spark/string/unbase64.slt deleted file mode 100644 index 5cf3fbee0455d..0000000000000 --- a/datafusion/sqllogictest/test_files/spark/string/unbase64.slt +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at - -# http://www.apache.org/licenses/LICENSE-2.0 - -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -# This file was originally created by a porting script from: -# https://github.com/lakehq/sail/tree/43b6ed8221de5c4c4adbedbb267ae1351158b43c/crates/sail-spark-connect/tests/gold_data/function -# This file is part of the implementation of the datafusion-spark function library. -# For more information, please see: -# https://github.com/apache/datafusion/issues/15914 - -## Original Query: SELECT unbase64('U3BhcmsgU1FM'); -## PySpark 3.5.5 Result: {'unbase64(U3BhcmsgU1FM)': bytearray(b'Spark SQL'), 'typeof(unbase64(U3BhcmsgU1FM))': 'binary', 'typeof(U3BhcmsgU1FM)': 'string'} -#query -#SELECT unbase64('U3BhcmsgU1FM'::string); From 7b83254ff6ee0b1201d47759f790b9aa3e008261 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:11:53 +0400 Subject: [PATCH 04/11] refactor: use LazyLock for base64 padding encoding in SparkBase64 and SparkUnBase64 --- datafusion/spark/src/function/string/base64.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index fb398f6f01777..925d896b7777b 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::{Field, FieldRef}; @@ -32,6 +32,10 @@ use datafusion_expr::{ }; use datafusion_functions::expr_fn::{decode, encode}; +const ENCODING: LazyLock = LazyLock::new(|| { + Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None) +}); + /// #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkBase64 { @@ -101,7 +105,7 @@ impl ScalarUDFImpl for SparkBase64 { let [bin] = take_function_args(self.name(), args)?; Ok(ExprSimplifyResult::Simplified(encode( bin, - Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None), + ENCODING.clone(), ))) } } @@ -175,7 +179,7 @@ impl ScalarUDFImpl for SparkUnBase64 { let [str] = take_function_args(self.name(), args)?; Ok(ExprSimplifyResult::Simplified(decode( str.cast_to(&DataType::Binary, info.schema())?, - Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None), + ENCODING.clone(), ))) } } From 7c4ab81a30a6ba9d28dd1f71ddb21d855c41e972 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:11:59 +0400 Subject: [PATCH 05/11] docs: update encode function formats to include base64pad --- docs/source/user-guide/sql/scalar_functions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index 605c3285c322c..473d7ad84b982 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -2175,7 +2175,7 @@ encode(expression, format) #### Arguments - **expression**: Expression containing string or binary data -- **format**: Supported formats are: `base64`, `hex` +- **format**: Supported formats are: `base64`, `base64pad`, `hex` **Related functions**: From d80486869fc1f136005a3ef1c03fb9617b70d2d8 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:21:49 +0400 Subject: [PATCH 06/11] docs: add comment clarifying that Apache Spark base64 uses padded encoding --- datafusion/spark/src/function/string/base64.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index 925d896b7777b..86d907d6ee970 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -36,6 +36,7 @@ const ENCODING: LazyLock = LazyLock::new(|| { Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None) }); +/// Apache Spark base64 uses padded base64 encoding. /// #[derive(Debug, PartialEq, Eq, Hash)] pub struct SparkBase64 { From 2fce59af982f75843006c3c28a29ada35f77412a Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 16:59:54 +0400 Subject: [PATCH 07/11] feat: enhance base64 encoding support with padded option and simplify constant definition --- datafusion/functions/src/encoding/inner.rs | 26 +++++++++++++++---- .../spark/src/function/string/base64.rs | 11 ++++---- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/datafusion/functions/src/encoding/inner.rs b/datafusion/functions/src/encoding/inner.rs index baa59b6f85d8a..4ad67b78178f2 100644 --- a/datafusion/functions/src/encoding/inner.rs +++ b/datafusion/functions/src/encoding/inner.rs @@ -404,11 +404,27 @@ impl Encoding { InputBinaryArray: BinaryArrayType<'a>, OutputOffset: OffsetSizeTrait, { - let array: GenericStringArray = array - .iter() - .map(|x| x.map(|x| self.encode_bytes(x))) - .collect(); - Ok(Arc::new(array)) + match self { + Self::Base64 => { + let array: GenericStringArray = array + .iter() + .map(|x| x.map(|x| BASE64_ENGINE.encode(x))) + .collect(); + Ok(Arc::new(array)) + } + Self::Base64Padded => { + let array: GenericStringArray = array + .iter() + .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x))) + .collect(); + Ok(Arc::new(array)) + } + Self::Hex => { + let array: GenericStringArray = + array.iter().map(|x| x.map(hex::encode)).collect(); + Ok(Arc::new(array)) + } + } } // OutputOffset important to ensure Large types output Large arrays diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index 86d907d6ee970..0097a2b09fbb9 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -22,19 +22,17 @@ use arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::{Field, FieldRef}; use datafusion_common::types::{NativeType, logical_string}; use datafusion_common::utils::take_function_args; -use datafusion_common::{Result, ScalarValue, exec_err, internal_err}; +use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; use datafusion_expr::{ - Coercion, Expr, ExprSchemable, ReturnFieldArgs, TypeSignatureClass, + Coercion, Expr, ExprSchemable, ReturnFieldArgs, TypeSignatureClass, lit, }; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::expr_fn::{decode, encode}; -const ENCODING: LazyLock = LazyLock::new(|| { - Expr::Literal(ScalarValue::Utf8(Some(String::from("base64pad"))), None) -}); +const ENCODING: LazyLock = LazyLock::new(|| lit("base64pad")); /// Apache Spark base64 uses padded base64 encoding. /// @@ -178,8 +176,9 @@ impl ScalarUDFImpl for SparkUnBase64 { info: &SimplifyContext, ) -> Result { let [str] = take_function_args(self.name(), args)?; + let bin = str.cast_to(&DataType::Binary, info.schema())?; Ok(ExprSimplifyResult::Simplified(decode( - str.cast_to(&DataType::Binary, info.schema())?, + bin, ENCODING.clone(), ))) } From 69fa8e37e08b0358ce79841f1229e1a5a0efd908 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 17:41:51 +0400 Subject: [PATCH 08/11] refactor: update base64 encoding to use literal for padding and improve error messages in unbase64 function --- .../spark/src/function/string/base64.rs | 23 ++++++++----------- .../test_files/spark/string/base64.slt | 2 +- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index 0097a2b09fbb9..9d36de12f6f9f 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -16,7 +16,7 @@ // under the License. use std::any::Any; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use arrow::datatypes::DataType; use datafusion_common::arrow::datatypes::{Field, FieldRef}; @@ -24,16 +24,12 @@ use datafusion_common::types::{NativeType, logical_string}; use datafusion_common::utils::take_function_args; use datafusion_common::{Result, exec_err, internal_err}; use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; -use datafusion_expr::{ - Coercion, Expr, ExprSchemable, ReturnFieldArgs, TypeSignatureClass, lit, -}; +use datafusion_expr::{Coercion, Expr, ReturnFieldArgs, TypeSignatureClass, lit}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; use datafusion_functions::expr_fn::{decode, encode}; -const ENCODING: LazyLock = LazyLock::new(|| lit("base64pad")); - /// Apache Spark base64 uses padded base64 encoding. /// #[derive(Debug, PartialEq, Eq, Hash)] @@ -104,7 +100,7 @@ impl ScalarUDFImpl for SparkBase64 { let [bin] = take_function_args(self.name(), args)?; Ok(ExprSimplifyResult::Simplified(encode( bin, - ENCODING.clone(), + lit("base64pad"), ))) } } @@ -126,9 +122,9 @@ impl SparkUnBase64 { Self { signature: Signature::coercible( vec![Coercion::new_implicit( - TypeSignatureClass::Native(logical_string()), - vec![TypeSignatureClass::Binary], - NativeType::String, + TypeSignatureClass::Binary, + vec![TypeSignatureClass::Native(logical_string())], + NativeType::Binary, )], Volatility::Immutable, ), @@ -173,13 +169,12 @@ impl ScalarUDFImpl for SparkUnBase64 { fn simplify( &self, args: Vec, - info: &SimplifyContext, + _info: &SimplifyContext, ) -> Result { - let [str] = take_function_args(self.name(), args)?; - let bin = str.cast_to(&DataType::Binary, info.schema())?; + let [bin] = take_function_args(self.name(), args)?; Ok(ExprSimplifyResult::Simplified(decode( bin, - ENCODING.clone(), + lit("base64pad"), ))) } } diff --git a/datafusion/sqllogictest/test_files/spark/string/base64.slt b/datafusion/sqllogictest/test_files/spark/string/base64.slt index 365ff4e313677..769cc23259ed8 100644 --- a/datafusion/sqllogictest/test_files/spark/string/base64.slt +++ b/datafusion/sqllogictest/test_files/spark/string/base64.slt @@ -111,5 +111,5 @@ SELECT unbase64('123'::string); query error Failed to decode value using base64 SELECT unbase64('123'::bytea); -query error Function 'unbase64' requires TypeSignatureClass::Native\(LogicalType\(Native\(String\), String\)\), but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'unbase64\(Int32\)'. +query error Function 'unbase64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'unbase64\(Int32\)'. SELECT unbase64(12::integer); From d6141e1623f5b454931a911bbb47923d5b1cafd7 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Sat, 24 Jan 2026 17:42:26 +0400 Subject: [PATCH 09/11] fix: correct return type mapping for LargeBinary in SparkUnBase64 implementation --- datafusion/spark/src/function/string/base64.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index 9d36de12f6f9f..9866dd88d9558 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -152,7 +152,7 @@ impl ScalarUDFImpl for SparkUnBase64 { fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result { let [str] = take_function_args(self.name(), args.arg_fields)?; let return_type = match str.data_type() { - DataType::LargeUtf8 => DataType::LargeBinary, + DataType::LargeBinary => DataType::LargeBinary, _ => DataType::Binary, }; Ok(Arc::new(Field::new( From 275d22b411d81453c0caea45cef9e7b2aa32193b Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 26 Jan 2026 08:34:23 +0400 Subject: [PATCH 10/11] fix: improve error message for invoke on simplified SparkBase64 function --- datafusion/spark/src/function/string/base64.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/string/base64.rs b/datafusion/spark/src/function/string/base64.rs index 9866dd88d9558..a171d4823b0fa 100644 --- a/datafusion/spark/src/function/string/base64.rs +++ b/datafusion/spark/src/function/string/base64.rs @@ -89,7 +89,10 @@ impl ScalarUDFImpl for SparkBase64 { } fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result { - exec_err!("{} should have been simplified", self.name()) + exec_err!( + "invoke should not be called on a simplified {} function", + self.name() + ) } fn simplify( From 7fe15dfeb93f083dc15900b0508e5df3f41dd3a8 Mon Sep 17 00:00:00 2001 From: Cyprien Huet Date: Mon, 26 Jan 2026 08:36:32 +0400 Subject: [PATCH 11/11] fix: remove redundant error message for base64 and unbase64 functions with Int32 input --- datafusion/sqllogictest/test_files/spark/string/base64.slt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/string/base64.slt b/datafusion/sqllogictest/test_files/spark/string/base64.slt index 769cc23259ed8..03b488de0ee9a 100644 --- a/datafusion/sqllogictest/test_files/spark/string/base64.slt +++ b/datafusion/sqllogictest/test_files/spark/string/base64.slt @@ -58,7 +58,7 @@ U3BhcmsgU1E= U3BhcmsgUw== NULL -query error Function 'base64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'base64\(Int32\)'. +query error Function 'base64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\) SELECT base64(12::integer); @@ -111,5 +111,5 @@ SELECT unbase64('123'::string); query error Failed to decode value using base64 SELECT unbase64('123'::bytea); -query error Function 'unbase64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\). No function matches the given name and argument types 'unbase64\(Int32\)'. +query error Function 'unbase64' requires TypeSignatureClass::Binary, but received Int32 \(DataType: Int32\) SELECT unbase64(12::integer);