-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(spark): add base64 and unbase64 functions
#19968
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cea76d2
d564f77
e6794ab
7b83254
7c4ab81
d804868
2fce59a
69fa8e3
d6141e1
275d22b
7fe15df
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,6 +52,12 @@ const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new( | |
| .with_decode_padding_mode(DecodePaddingMode::Indifferent), | ||
| ); | ||
|
|
||
| // Generate padding characters when encoding | ||
| const BASE64_ENGINE_PADDED: GeneralPurpose = GeneralPurpose::new( | ||
| &base64::alphabet::STANDARD, | ||
| GeneralPurposeConfig::new().with_encode_padding(true), | ||
| ); | ||
|
|
||
| #[user_doc( | ||
| doc_section(label = "Binary String Functions"), | ||
| description = "Encode binary data into a textual representation.", | ||
|
|
@@ -62,7 +68,7 @@ const BASE64_ENGINE: GeneralPurpose = GeneralPurpose::new( | |
| ), | ||
| argument( | ||
| name = "format", | ||
| description = "Supported formats are: `base64`, `hex`" | ||
| description = "Supported formats are: `base64`, `base64pad`, `hex`" | ||
| ), | ||
| related_udf(name = "decode") | ||
| )] | ||
|
|
@@ -319,12 +325,18 @@ fn decode_array(array: &ArrayRef, encoding: Encoding) -> Result<ColumnarValue> { | |
| #[derive(Debug, Copy, Clone)] | ||
| enum Encoding { | ||
| Base64, | ||
| Base64Padded, | ||
| Hex, | ||
| } | ||
|
|
||
| impl fmt::Display for Encoding { | ||
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
| write!(f, "{}", format!("{self:?}").to_lowercase()) | ||
| let name = match self { | ||
| Self::Base64 => "base64", | ||
| Self::Base64Padded => "base64pad", | ||
| Self::Hex => "hex", | ||
| }; | ||
| write!(f, "{name}") | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -345,9 +357,10 @@ impl TryFrom<&ColumnarValue> for Encoding { | |
| }; | ||
| match encoding { | ||
| "base64" => Ok(Self::Base64), | ||
| "base64pad" => Ok(Self::Base64Padded), | ||
| "hex" => Ok(Self::Hex), | ||
| _ => { | ||
| let options = [Self::Base64, Self::Hex] | ||
| let options = [Self::Base64, Self::Base64Padded, Self::Hex] | ||
| .iter() | ||
| .map(|i| i.to_string()) | ||
| .collect::<Vec<_>>() | ||
|
|
@@ -364,15 +377,18 @@ impl Encoding { | |
| fn encode_bytes(self, value: &[u8]) -> String { | ||
| match self { | ||
| Self::Base64 => BASE64_ENGINE.encode(value), | ||
| Self::Base64Padded => BASE64_ENGINE_PADDED.encode(value), | ||
| Self::Hex => hex::encode(value), | ||
| } | ||
| } | ||
|
|
||
| fn decode_bytes(self, value: &[u8]) -> Result<Vec<u8>> { | ||
| match self { | ||
| Self::Base64 => BASE64_ENGINE.decode(value).map_err(|e| { | ||
| exec_datafusion_err!("Failed to decode value using base64: {e}") | ||
| }), | ||
| Self::Base64 | Self::Base64Padded => { | ||
| BASE64_ENGINE.decode(value).map_err(|e| { | ||
| exec_datafusion_err!("Failed to decode value using {self}: {e}") | ||
| }) | ||
| } | ||
| Self::Hex => hex::decode(value).map_err(|e| { | ||
| exec_datafusion_err!("Failed to decode value using hex: {e}") | ||
| }), | ||
|
|
@@ -396,6 +412,13 @@ impl Encoding { | |
| .collect(); | ||
| Ok(Arc::new(array)) | ||
| } | ||
| Self::Base64Padded => { | ||
| let array: GenericStringArray<OutputOffset> = array | ||
| .iter() | ||
| .map(|x| x.map(|x| BASE64_ENGINE_PADDED.encode(x))) | ||
| .collect(); | ||
| Ok(Arc::new(array)) | ||
| } | ||
| Self::Hex => { | ||
| let array: GenericStringArray<OutputOffset> = | ||
| array.iter().map(|x| x.map(hex::encode)).collect(); | ||
|
|
@@ -430,7 +453,7 @@ impl Encoding { | |
| } | ||
|
|
||
| match self { | ||
| Self::Base64 => { | ||
| Self::Base64 | Self::Base64Padded => { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. decoding is the same |
||
| let upper_bound = base64::decoded_len_estimate(approx_data_size); | ||
| delegated_decode::<_, _, OutputOffset>(base64_decode, value, upper_bound) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,183 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::any::Any; | ||
| use std::sync::Arc; | ||
|
|
||
| use arrow::datatypes::DataType; | ||
| use datafusion_common::arrow::datatypes::{Field, FieldRef}; | ||
| use datafusion_common::types::{NativeType, logical_string}; | ||
| use datafusion_common::utils::take_function_args; | ||
| use datafusion_common::{Result, exec_err, internal_err}; | ||
| use datafusion_expr::simplify::{ExprSimplifyResult, SimplifyContext}; | ||
| use datafusion_expr::{Coercion, Expr, ReturnFieldArgs, TypeSignatureClass, lit}; | ||
| use datafusion_expr::{ | ||
| ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, | ||
| }; | ||
| use datafusion_functions::expr_fn::{decode, encode}; | ||
|
|
||
| /// Apache Spark base64 uses padded base64 encoding. | ||
| /// <https://spark.apache.org/docs/latest/api/sql/index.html#base64> | ||
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub struct SparkBase64 { | ||
| signature: Signature, | ||
| } | ||
|
|
||
| impl Default for SparkBase64 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl SparkBase64 { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::coercible( | ||
| vec![Coercion::new_implicit( | ||
| TypeSignatureClass::Binary, | ||
| vec![TypeSignatureClass::Native(logical_string())], | ||
| NativeType::Binary, | ||
| )], | ||
| Volatility::Immutable, | ||
| ), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ScalarUDFImpl for SparkBase64 { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
| "base64" | ||
| } | ||
|
|
||
| fn signature(&self) -> &Signature { | ||
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| internal_err!("return_type should not be called for {}", self.name()) | ||
| } | ||
|
|
||
| fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> { | ||
| let [bin] = take_function_args(self.name(), args.arg_fields)?; | ||
| let return_type = match bin.data_type() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. matching return type with DF encode function |
||
| DataType::LargeBinary => DataType::LargeUtf8, | ||
| _ => DataType::Utf8, | ||
| }; | ||
| Ok(Arc::new(Field::new( | ||
| self.name(), | ||
| return_type, | ||
| bin.is_nullable(), | ||
| ))) | ||
| } | ||
|
|
||
| fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| exec_err!( | ||
| "invoke should not be called on a simplified {} function", | ||
| self.name() | ||
| ) | ||
| } | ||
|
|
||
| fn simplify( | ||
| &self, | ||
| args: Vec<Expr>, | ||
| _info: &SimplifyContext, | ||
| ) -> Result<ExprSimplifyResult> { | ||
| let [bin] = take_function_args(self.name(), args)?; | ||
| Ok(ExprSimplifyResult::Simplified(encode( | ||
| bin, | ||
| lit("base64pad"), | ||
| ))) | ||
| } | ||
| } | ||
|
|
||
| /// <https://spark.apache.org/docs/latest/api/sql/index.html#unbase64> | ||
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub struct SparkUnBase64 { | ||
| signature: Signature, | ||
| } | ||
|
|
||
| impl Default for SparkUnBase64 { | ||
| fn default() -> Self { | ||
| Self::new() | ||
| } | ||
| } | ||
|
|
||
| impl SparkUnBase64 { | ||
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::coercible( | ||
| vec![Coercion::new_implicit( | ||
| TypeSignatureClass::Binary, | ||
| vec![TypeSignatureClass::Native(logical_string())], | ||
| NativeType::Binary, | ||
| )], | ||
| Volatility::Immutable, | ||
| ), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ScalarUDFImpl for SparkUnBase64 { | ||
| fn as_any(&self) -> &dyn Any { | ||
| self | ||
| } | ||
|
|
||
| fn name(&self) -> &str { | ||
| "unbase64" | ||
| } | ||
|
|
||
| fn signature(&self) -> &Signature { | ||
| &self.signature | ||
| } | ||
|
|
||
| fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> { | ||
| internal_err!("return_type should not be called for {}", self.name()) | ||
| } | ||
|
|
||
| fn return_field_from_args(&self, args: ReturnFieldArgs<'_>) -> Result<FieldRef> { | ||
| let [str] = take_function_args(self.name(), args.arg_fields)?; | ||
| let return_type = match str.data_type() { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. matching return type with DF decode function |
||
| DataType::LargeBinary => DataType::LargeBinary, | ||
| _ => DataType::Binary, | ||
| }; | ||
| Ok(Arc::new(Field::new( | ||
| self.name(), | ||
| return_type, | ||
| str.is_nullable(), | ||
| ))) | ||
| } | ||
|
|
||
| fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
| exec_err!("{} should have been simplified", self.name()) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if it fires, the error message would be highly confusing IMO |
||
| } | ||
|
|
||
| fn simplify( | ||
| &self, | ||
| args: Vec<Expr>, | ||
| _info: &SimplifyContext, | ||
| ) -> Result<ExprSimplifyResult> { | ||
| let [bin] = take_function_args(self.name(), args)?; | ||
| Ok(ExprSimplifyResult::Simplified(decode( | ||
| bin, | ||
| lit("base64pad"), | ||
| ))) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this benchmark was broken beacuse of input/output type mismatch