From db361ae386aa950e88117758f754e0545a291fc8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 14:15:50 -0600 Subject: [PATCH 01/25] add skeleton for StructsToJson --- docs/source/user-guide/configs.md | 2 +- .../core/src/execution/datafusion/planner.rs | 11 +- native/proto/src/proto/expr.proto | 7 ++ native/spark-expr/src/lib.rs | 2 + native/spark-expr/src/to_json.rs | 103 ++++++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 20 ++++ .../apache/comet/CometExpressionSuite.scala | 12 ++ 7 files changed, 152 insertions(+), 5 deletions(-) create mode 100644 native/spark-expr/src/to_json.rs diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 446ed73340..7d3f2a663b 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -63,8 +63,8 @@ Comet provides the following configuration settings. | spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false | | spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false | -| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan | | spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true | | spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false | | spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 | | spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 | +| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan | diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index e1d65a4b2d..5eac2c2593 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -93,10 +93,7 @@ use datafusion_comet_proto::{ }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; -use datafusion_comet_spark_expr::{ - Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, MinuteExpr, RLike, - SecondExpr, TimestampTruncExpr, -}; +use datafusion_comet_spark_expr::{Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson}; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, @@ -625,6 +622,12 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema.clone())?; Ok(Arc::new(GetStructField::new(child, expr.ordinal as usize))) } + ExprStruct::ToJson(expr) => { + let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; + Ok(Arc::new(ToJson::new( + child, + ))) + } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", expr diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 50ab8f5142..529d8dd30e 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -79,6 +79,7 @@ message Expr { BloomFilterMightContain bloom_filter_might_contain = 52; CreateNamedStruct create_named_struct = 53; GetStructField get_struct_field = 54; + ToJson to_json = 55; } } @@ -343,6 +344,12 @@ message StringSpace { Expr child = 1; } +message ToJson { + Expr child = 1; + // TODO timezone + // TODO options +} + message Hour { Expr child = 1; string timezone = 2; diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index cf7fc872b6..33dd5bd222 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -26,10 +26,12 @@ pub mod spark_hash; mod structs; mod temporal; pub mod timezone; +mod to_json; pub mod utils; mod xxhash64; pub use cast::{spark_cast, Cast}; +pub use to_json::ToJson; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; pub use regexp::RLike; diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs new file mode 100644 index 0000000000..48c4061d5c --- /dev/null +++ b/native/spark-expr/src/to_json.rs @@ -0,0 +1,103 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// TODO upstream this to DataFusion as long as we have a way to specify all +// of the Spark-specific compatibility features that we need + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_schema::{DataType, Schema}; +use datafusion_common::Result; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use std::any::Any; +use std::fmt::{Debug, Display, Formatter}; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +/// to_json function +#[derive(Debug, Hash)] +pub struct ToJson { + /// The input to convert to JSON + expr: Arc, //TODO options such as null handling +} + +impl ToJson { + pub fn new(expr: Arc) -> Self { + Self { expr } + } +} + +impl Display for ToJson { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + // TODO options + write!(f, "to_json({})", self.expr) + } +} + +impl PartialEq for ToJson { + fn eq(&self, other: &dyn Any) -> bool { + if let Some(other) = other.downcast_ref::() { + //TODO compare options + self.expr.eq(&other.expr) + } else { + false + } + } +} + +impl PhysicalExpr for ToJson { + fn as_any(&self) -> &dyn Any { + self + } + + fn data_type(&self, _: &Schema) -> Result { + Ok(DataType::Utf8) + } + + fn nullable(&self, input_schema: &Schema) -> Result { + self.expr.nullable(input_schema) + } + + fn evaluate(&self, batch: &RecordBatch) -> Result { + let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; + Ok(ColumnarValue::Array(to_json(input.as_ref())?)) + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.expr] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + assert!(children.len() == 1); + // TODO options + Ok(Arc::new(Self::new(children[0].clone()))) + } + + fn dyn_hash(&self, state: &mut dyn Hasher) { + // TODO options + let mut s = state; + self.expr.hash(&mut s); + self.hash(&mut s); + } +} + +fn to_json(array: &dyn Array) -> Result { + todo!() +} diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 980b398234..27f526358e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1210,6 +1210,26 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } + case StructsToJson(options, child, timezoneId) => + if (options.nonEmpty) { + withInfo(expr, "StructsToJson with options is not supported") + None + } else { + // TODO check for supported data types + exprToProto(child, input, binding) match { + case Some(p) => + val toJson = ExprOuterClass.ToJson.newBuilder().setChild(p).build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setToJson(toJson) + .build()) + case _ => + withInfo(expr, child) + None + } + } + case Like(left, right, escapeChar) => if (escapeChar == '\\') { val leftExpr = exprToProtoInternal(left, inputs) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 8eeb998bd1..5dbe10454b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1957,6 +1957,18 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("to_json with struct") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) + withParquetTable(path.toString, "tbl") { + checkSparkAnswerAndOperator("SELECT to_json(named_struct('a', _1, 'b', _2)) FROM tbl") + } + } + } + } + test("struct and named_struct with dictionary") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( From 9f8cab171bc5fcd7c660fecd2e6a7be7698c61d9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 14:57:52 -0600 Subject: [PATCH 02/25] first test passes --- .../core/src/execution/datafusion/planner.rs | 9 +- native/spark-expr/src/lib.rs | 2 +- native/spark-expr/src/to_json.rs | 115 ++++++++++++++++-- 3 files changed, 111 insertions(+), 15 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 5eac2c2593..3ebb742aae 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -93,7 +93,10 @@ use datafusion_comet_proto::{ }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; -use datafusion_comet_spark_expr::{Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, MinuteExpr, RLike, SecondExpr, TimestampTruncExpr, ToJson}; +use datafusion_comet_spark_expr::{ + Cast, CreateNamedStruct, DateTruncExpr, GetStructField, HourExpr, IfExpr, MinuteExpr, RLike, + SecondExpr, TimestampTruncExpr, ToJson, +}; use datafusion_common::scalar::ScalarStructBuilder; use datafusion_common::{ tree_node::{Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter}, @@ -624,9 +627,7 @@ impl PhysicalPlanner { } ExprStruct::ToJson(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(ToJson::new( - child, - ))) + Ok(Arc::new(ToJson::new(child))) } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", diff --git a/native/spark-expr/src/lib.rs b/native/spark-expr/src/lib.rs index 33dd5bd222..3170712526 100644 --- a/native/spark-expr/src/lib.rs +++ b/native/spark-expr/src/lib.rs @@ -31,12 +31,12 @@ pub mod utils; mod xxhash64; pub use cast::{spark_cast, Cast}; -pub use to_json::ToJson; pub use error::{SparkError, SparkResult}; pub use if_expr::IfExpr; pub use regexp::RLike; pub use structs::{CreateNamedStruct, GetStructField}; pub use temporal::{DateTruncExpr, HourExpr, MinuteExpr, SecondExpr, TimestampTruncExpr}; +pub use to_json::ToJson; /// Spark supports three evaluation modes when evaluating expressions, which affect /// the behavior when processing input values that are invalid or would result in an diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 48c4061d5c..b60f000a3d 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -16,11 +16,14 @@ // under the License. // TODO upstream this to DataFusion as long as we have a way to specify all -// of the Spark-specific compatibility features that we need +// of the Spark-specific compatibility features that we need (including +// being able to specify Spark-compatible cast from all types to string) -use arrow_array::{Array, ArrayRef, RecordBatch}; +use crate::{spark_cast, EvalMode}; +use arrow_array::builder::StringBuilder; +use arrow_array::{Array, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Schema}; -use datafusion_common::Result; +use datafusion_common::{DataFusionError, Result}; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; @@ -43,7 +46,7 @@ impl ToJson { impl Display for ToJson { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // TODO options + // TODO options & timezone write!(f, "to_json({})", self.expr) } } @@ -51,7 +54,7 @@ impl Display for ToJson { impl PartialEq for ToJson { fn eq(&self, other: &dyn Any) -> bool { if let Some(other) = other.downcast_ref::() { - //TODO compare options + //TODO compare options & timezone self.expr.eq(&other.expr) } else { false @@ -74,7 +77,13 @@ impl PhysicalExpr for ToJson { fn evaluate(&self, batch: &RecordBatch) -> Result { let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; - Ok(ColumnarValue::Array(to_json(input.as_ref())?)) + if let Some(struct_array) = input.as_any().downcast_ref::() { + Ok(ColumnarValue::Array(Arc::new(struct_to_json( + struct_array, + )?))) + } else { + todo!() + } } fn children(&self) -> Vec<&Arc> { @@ -86,18 +95,104 @@ impl PhysicalExpr for ToJson { children: Vec>, ) -> Result> { assert!(children.len() == 1); - // TODO options + // TODO options & timezone Ok(Arc::new(Self::new(children[0].clone()))) } fn dyn_hash(&self, state: &mut dyn Hasher) { - // TODO options + // TODO options & timezone let mut s = state; self.expr.hash(&mut s); self.hash(&mut s); } } -fn to_json(array: &dyn Array) -> Result { - todo!() +fn struct_to_json(array: &StructArray) -> Result { + // create string representation of each column first + let string_arrays: Vec> = array + .columns() + .iter() + .map(|arr| { + spark_cast( + ColumnarValue::Array(arr.clone()), + &DataType::Utf8, + EvalMode::Legacy, + "UTC".to_string(), // TODO Remove hard-coded timezone + ) + .and_then(|casted_value| casted_value.into_array(array.len())) + .and_then(|array_ref| { + array_ref + .as_any() + .downcast_ref::() + .map(|string_array| Arc::new(string_array.clone())) + .ok_or_else(|| DataFusionError::Execution("Expected StringArray".to_string())) + }) + }) + .collect::>>()?; + let field_names: Vec = array.fields().iter().map(|f| f.name().clone()).collect(); + + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 64); + for row_index in 0..array.len() { + if array.is_null(row_index) { + builder.append_null(); + } else { + let mut json = String::new(); + let mut any_fields_written = false; + json.push('{'); + for col_index in 0..string_arrays.len() { + if !string_arrays[col_index].is_null(row_index) { + if any_fields_written { + json.push(','); + } + // quoted field name + json.push('"'); + json.push_str(&field_names[col_index]); + json.push_str("\":"); + // value + json.push_str(string_arrays[col_index].value(row_index)); + any_fields_written = true; + } + } + json.push('}'); + builder.append_value(json); + } + } + Ok(builder.finish()) +} + +#[cfg(test)] +mod test { + use crate::to_json::struct_to_json; + use arrow_array::Array; + use arrow_array::{ArrayRef, BooleanArray, Int32Array, StructArray}; + use arrow_schema::{DataType, Field}; + use datafusion_common::Result; + use std::sync::Arc; + + #[test] + fn test_primitives() -> Result<()> { + let bools: ArrayRef = Arc::new(BooleanArray::from(vec![ + None, + Some(true), + Some(false), + Some(false), + ])); + let ints: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(123), + None, + Some(i32::MAX), + Some(i32::MIN), + ])); + let struct_array = StructArray::from(vec![ + (Arc::new(Field::new("a", DataType::Boolean, true)), bools), + (Arc::new(Field::new("b", DataType::Int32, true)), ints), + ]); + let json = struct_to_json(&struct_array)?; + assert_eq!(4, json.len()); + assert_eq!(r#"{"b":123}"#, json.value(0)); + assert_eq!(r#"{"a":true}"#, json.value(1)); + assert_eq!(r#"{"a":false,"b":2147483647}"#, json.value(2)); + assert_eq!(r#"{"a":false,"b":-2147483648}"#, json.value(3)); + Ok(()) + } } From 65be1d474523408cdd3bb7d9d4121b8b50959f9e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 16:19:41 -0600 Subject: [PATCH 03/25] add support for nested structs --- .../core/src/execution/datafusion/planner.rs | 2 +- native/proto/src/proto/expr.proto | 2 +- native/spark-expr/src/cast.rs | 4 +- native/spark-expr/src/to_json.rs | 187 ++++++++++++------ .../apache/comet/serde/QueryPlanSerde.scala | 7 +- 5 files changed, 140 insertions(+), 62 deletions(-) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 3ebb742aae..4f9e11ce07 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -627,7 +627,7 @@ impl PhysicalPlanner { } ExprStruct::ToJson(expr) => { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; - Ok(Arc::new(ToJson::new(child))) + Ok(Arc::new(ToJson::new(child, &expr.timezone))) } expr => Err(ExecutionError::GeneralError(format!( "Not implemented: {:?}", diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 529d8dd30e..815541fcc2 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -346,7 +346,7 @@ message StringSpace { message ToJson { Expr child = 1; - // TODO timezone + string timezone = 2; // TODO options } diff --git a/native/spark-expr/src/cast.rs b/native/spark-expr/src/cast.rs index e44b1c9f5d..4a7503790f 100644 --- a/native/spark-expr/src/cast.rs +++ b/native/spark-expr/src/cast.rs @@ -575,7 +575,7 @@ pub fn spark_cast( arg: ColumnarValue, data_type: &DataType, eval_mode: EvalMode, - timezone: String, + timezone: &str, ) -> DataFusionResult { match arg { ColumnarValue::Array(array) => Ok(ColumnarValue::Array(cast_array( @@ -1380,7 +1380,7 @@ impl PhysicalExpr for Cast { fn evaluate(&self, batch: &RecordBatch) -> DataFusionResult { let arg = self.child.evaluate(batch)?; - spark_cast(arg, &self.data_type, self.eval_mode, self.timezone.clone()) + spark_cast(arg, &self.data_type, self.eval_mode, &self.timezone) } fn children(&self) -> Vec<&Arc> { diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index b60f000a3d..652f489fef 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -21,9 +21,9 @@ use crate::{spark_cast, EvalMode}; use arrow_array::builder::StringBuilder; -use arrow_array::{Array, RecordBatch, StringArray, StructArray}; +use arrow_array::{Array, ArrayRef, RecordBatch, StringArray, StructArray}; use arrow_schema::{DataType, Schema}; -use datafusion_common::{DataFusionError, Result}; +use datafusion_common::Result; use datafusion_expr::ColumnarValue; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use std::any::Any; @@ -35,27 +35,32 @@ use std::sync::Arc; #[derive(Debug, Hash)] pub struct ToJson { /// The input to convert to JSON - expr: Arc, //TODO options such as null handling + expr: Arc, + /// Timezone to use when converting timestamps to JSON + timezone: String, } impl ToJson { - pub fn new(expr: Arc) -> Self { - Self { expr } + pub fn new(expr: Arc, timezone: &str) -> Self { + Self { + expr, + timezone: timezone.to_owned(), + } } } impl Display for ToJson { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // TODO options & timezone - write!(f, "to_json({})", self.expr) + // TODO options + write!(f, "to_json({}, timezone={})", self.expr, self.timezone) } } impl PartialEq for ToJson { fn eq(&self, other: &dyn Any) -> bool { if let Some(other) = other.downcast_ref::() { - //TODO compare options & timezone - self.expr.eq(&other.expr) + //TODO compare options + self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone) } else { false } @@ -77,13 +82,10 @@ impl PhysicalExpr for ToJson { fn evaluate(&self, batch: &RecordBatch) -> Result { let input = self.expr.evaluate(batch)?.into_array(batch.num_rows())?; - if let Some(struct_array) = input.as_any().downcast_ref::() { - Ok(ColumnarValue::Array(Arc::new(struct_to_json( - struct_array, - )?))) - } else { - todo!() - } + Ok(ColumnarValue::Array(array_to_json_string( + &input, + &self.timezone, + )?)) } fn children(&self) -> Vec<&Arc> { @@ -95,48 +97,59 @@ impl PhysicalExpr for ToJson { children: Vec>, ) -> Result> { assert!(children.len() == 1); - // TODO options & timezone - Ok(Arc::new(Self::new(children[0].clone()))) + // TODO options + Ok(Arc::new(Self::new(children[0].clone(), &self.timezone))) } fn dyn_hash(&self, state: &mut dyn Hasher) { - // TODO options & timezone + // TODO options let mut s = state; self.expr.hash(&mut s); + self.timezone.hash(&mut s); self.hash(&mut s); } } -fn struct_to_json(array: &StructArray) -> Result { - // create string representation of each column first - let string_arrays: Vec> = array +/// Convert an array into a JSON value string representation +fn array_to_json_string(arr: &Arc, timezone: &str) -> Result { + if let Some(struct_array) = arr.as_any().downcast_ref::() { + struct_to_json(struct_array, timezone) + } else { + spark_cast( + ColumnarValue::Array(Arc::clone(arr)), + &DataType::Utf8, + EvalMode::Legacy, + timezone, + )? + .into_array(arr.len()) + } +} + +fn struct_to_json(array: &StructArray, timezone: &str) -> Result { + // get field names + let field_names: Vec = array.fields().iter().map(|f| f.name().clone()).collect(); + // create JSON string representation of each column + let string_arrays: Vec = array .columns() + .iter() + .map(|arr| array_to_json_string(arr, timezone)) + .collect::>>()?; + let string_arrays: Vec<&StringArray> = string_arrays .iter() .map(|arr| { - spark_cast( - ColumnarValue::Array(arr.clone()), - &DataType::Utf8, - EvalMode::Legacy, - "UTC".to_string(), // TODO Remove hard-coded timezone - ) - .and_then(|casted_value| casted_value.into_array(array.len())) - .and_then(|array_ref| { - array_ref - .as_any() - .downcast_ref::() - .map(|string_array| Arc::new(string_array.clone())) - .ok_or_else(|| DataFusionError::Execution("Expected StringArray".to_string())) - }) + arr.as_any() + .downcast_ref::() + .expect("string array") }) - .collect::>>()?; - let field_names: Vec = array.fields().iter().map(|f| f.name().clone()).collect(); - - let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 64); + .collect(); + // build the JSON string containing entries in the format `"field_name":field_value` + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); + let mut json = String::with_capacity(array.len() * 16); for row_index in 0..array.len() { if array.is_null(row_index) { builder.append_null(); } else { - let mut json = String::new(); + json.clear(); let mut any_fields_written = false; json.push('{'); for col_index in 0..string_arrays.len() { @@ -154,16 +167,18 @@ fn struct_to_json(array: &StructArray) -> Result { } } json.push('}'); - builder.append_value(json); + // TODO how to pass slice here instead of cloning the string? + builder.append_value(json.clone()); } } - Ok(builder.finish()) + Ok(Arc::new(builder.finish())) } #[cfg(test)] mod test { use crate::to_json::struct_to_json; - use arrow_array::Array; + use arrow_array::types::Int32Type; + use arrow_array::{Array, PrimitiveArray, StringArray}; use arrow_array::{ArrayRef, BooleanArray, Int32Array, StructArray}; use arrow_schema::{DataType, Field}; use datafusion_common::Result; @@ -171,23 +186,17 @@ mod test { #[test] fn test_primitives() -> Result<()> { - let bools: ArrayRef = Arc::new(BooleanArray::from(vec![ - None, - Some(true), - Some(false), - Some(false), - ])); - let ints: ArrayRef = Arc::new(Int32Array::from(vec![ - Some(123), - None, - Some(i32::MAX), - Some(i32::MIN), - ])); + let bools: ArrayRef = create_bools(); + let ints: ArrayRef = create_ints(); let struct_array = StructArray::from(vec![ (Arc::new(Field::new("a", DataType::Boolean, true)), bools), (Arc::new(Field::new("b", DataType::Int32, true)), ints), ]); - let json = struct_to_json(&struct_array)?; + let json = struct_to_json(&struct_array, "UTC")?; + let json = json + .as_any() + .downcast_ref::() + .expect("string array"); assert_eq!(4, json.len()); assert_eq!(r#"{"b":123}"#, json.value(0)); assert_eq!(r#"{"a":true}"#, json.value(1)); @@ -195,4 +204,68 @@ mod test { assert_eq!(r#"{"a":false,"b":-2147483648}"#, json.value(3)); Ok(()) } + + #[test] + fn test_nested_struct() -> Result<()> { + let bools: ArrayRef = create_bools(); + let ints: ArrayRef = create_ints(); + + // create first struct array + let struct_fields = vec![ + Arc::new(Field::new("a", DataType::Boolean, true)), + Arc::new(Field::new("b", DataType::Int32, true)), + ]; + let struct_values = vec![bools, ints]; + let struct_array = StructArray::from( + struct_fields + .clone() + .into_iter() + .zip(struct_values) + .collect::>(), + ); + + // create second struct array containing the first struct array + let struct_fields2 = vec![Arc::new(Field::new( + "a", + DataType::Struct(struct_fields.into()), + true, + ))]; + let struct_values2: Vec = vec![Arc::new(struct_array.clone())]; + let struct_array2 = StructArray::from( + struct_fields2 + .into_iter() + .zip(struct_values2) + .collect::>(), + ); + + let json = struct_to_json(&struct_array2, "UTC")?; + let json = json + .as_any() + .downcast_ref::() + .expect("string array"); + assert_eq!(4, json.len()); + assert_eq!(r#"{"a":{"b":123}}"#, json.value(0)); + assert_eq!(r#"{"a":{"a":true}}"#, json.value(1)); + assert_eq!(r#"{"a":{"a":false,"b":2147483647}}"#, json.value(2)); + assert_eq!(r#"{"a":{"a":false,"b":-2147483648}}"#, json.value(3)); + Ok(()) + } + + fn create_ints() -> Arc> { + Arc::new(Int32Array::from(vec![ + Some(123), + None, + Some(i32::MAX), + Some(i32::MIN), + ])) + } + + fn create_bools() -> Arc { + Arc::new(BooleanArray::from(vec![ + None, + Some(true), + Some(false), + Some(false), + ])) + } } diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 27f526358e..3f81b14f05 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1216,9 +1216,14 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim None } else { // TODO check for supported data types + // TODO check for supported timezone exprToProto(child, input, binding) match { case Some(p) => - val toJson = ExprOuterClass.ToJson.newBuilder().setChild(p).build() + val toJson = ExprOuterClass.ToJson + .newBuilder() + .setChild(p) + .setTimezone(timezoneId.getOrElse("UTC")) + .build() Some( ExprOuterClass.Expr .newBuilder() From 779344ef008408ce86cd31d7422213621296a30e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 16:27:39 -0600 Subject: [PATCH 04/25] add support for strings and improve test --- native/spark-expr/src/to_json.rs | 36 +++++++++++++++++-- .../apache/comet/CometExpressionSuite.scala | 18 ++++++---- 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 652f489fef..52d65b6be8 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -128,6 +128,19 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result Result { // get field names let field_names: Vec = array.fields().iter().map(|f| f.name().clone()).collect(); + // determine which fields need to have their values quoted + let quotes_needed: Vec = array + .fields() + .iter() + .map(|f| match f.data_type() { + DataType::Utf8 | DataType::LargeUtf8 => true, + DataType::Dictionary(_, dt) => match dt.as_ref() { + DataType::Utf8 | DataType::LargeUtf8 => true, + _ => false, + }, + _ => false, + }) + .collect(); // create JSON string representation of each column let string_arrays: Vec = array .columns() @@ -162,7 +175,13 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { json.push_str(&field_names[col_index]); json.push_str("\":"); // value + if quotes_needed[col_index] { + json.push('"'); + } json.push_str(string_arrays[col_index].value(row_index)); + if quotes_needed[col_index] { + json.push('"'); + } any_fields_written = true; } } @@ -188,9 +207,11 @@ mod test { fn test_primitives() -> Result<()> { let bools: ArrayRef = create_bools(); let ints: ArrayRef = create_ints(); + let strings: ArrayRef = create_strings(); let struct_array = StructArray::from(vec![ (Arc::new(Field::new("a", DataType::Boolean, true)), bools), (Arc::new(Field::new("b", DataType::Int32, true)), ints), + (Arc::new(Field::new("c", DataType::Utf8, true)), strings), ]); let json = struct_to_json(&struct_array, "UTC")?; let json = json @@ -199,9 +220,9 @@ mod test { .expect("string array"); assert_eq!(4, json.len()); assert_eq!(r#"{"b":123}"#, json.value(0)); - assert_eq!(r#"{"a":true}"#, json.value(1)); - assert_eq!(r#"{"a":false,"b":2147483647}"#, json.value(2)); - assert_eq!(r#"{"a":false,"b":-2147483648}"#, json.value(3)); + assert_eq!(r#"{"a":true,"c":"foo"}"#, json.value(1)); + assert_eq!(r#"{"a":false,"b":2147483647,"c":"bar"}"#, json.value(2)); + assert_eq!(r#"{"a":false,"b":-2147483648,"c":""}"#, json.value(3)); Ok(()) } @@ -268,4 +289,13 @@ mod test { Some(false), ])) } + + fn create_strings() -> Arc { + Arc::new(StringArray::from(vec![ + None, + Some("foo"), + Some("bar"), + Some(""), + ])) + } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 5dbe10454b..9fc28671d9 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1957,14 +1957,18 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("to_json with struct") { + test("to_json") { Seq(true, false).foreach { dictionaryEnabled => - withTempDir { dir => - val path = new Path(dir.toURI.toString, "test.parquet") - makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 10000) - withParquetTable(path.toString, "tbl") { - checkSparkAnswerAndOperator("SELECT to_json(named_struct('a', _1, 'b', _2)) FROM tbl") - } + withParquetTable( + (0 until 100).map(i => + ( + i, + if (i % 2 == 0) { "even" } + else { "odd" })), + "tbl", + withDictionary = dictionaryEnabled) { + checkSparkAnswerAndOperator("SELECT to_json(named_struct('a', _1, 'b', _2)) FROM tbl") + checkSparkAnswerAndOperator("SELECT to_json(named_struct('nested', named_struct('a', _1, 'b', _2))) FROM tbl") } } } From e769d2f7068831e70ecb63e00d8769d51351f1c6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 23:04:51 -0600 Subject: [PATCH 05/25] clippy --- native/spark-expr/src/to_json.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 52d65b6be8..cab1a2a134 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -134,10 +134,7 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { .iter() .map(|f| match f.data_type() { DataType::Utf8 | DataType::LargeUtf8 => true, - DataType::Dictionary(_, dt) => match dt.as_ref() { - DataType::Utf8 | DataType::LargeUtf8 => true, - _ => false, - }, + DataType::Dictionary(_, dt) => matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8), _ => false, }) .collect(); From 5eccf37c05a28020bb8daaf325a8fabad15ed763 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 10 Aug 2024 23:17:44 -0600 Subject: [PATCH 06/25] format --- native/spark-expr/src/to_json.rs | 4 +++- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index cab1a2a134..2abd0e2a77 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -134,7 +134,9 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { .iter() .map(|f| match f.data_type() { DataType::Utf8 | DataType::LargeUtf8 => true, - DataType::Dictionary(_, dt) => matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8), + DataType::Dictionary(_, dt) => { + matches!(dt.as_ref(), DataType::Utf8 | DataType::LargeUtf8) + } _ => false, }) .collect(); diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 9fc28671d9..06b976879e 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1968,7 +1968,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { "tbl", withDictionary = dictionaryEnabled) { checkSparkAnswerAndOperator("SELECT to_json(named_struct('a', _1, 'b', _2)) FROM tbl") - checkSparkAnswerAndOperator("SELECT to_json(named_struct('nested', named_struct('a', _1, 'b', _2))) FROM tbl") + checkSparkAnswerAndOperator( + "SELECT to_json(named_struct('nested', named_struct('a', _1, 'b', _2))) FROM tbl") } } } From 636b95a40dd0863983de86050a028486abf8722c Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Aug 2024 08:25:56 -0600 Subject: [PATCH 07/25] prepare for review --- native/proto/src/proto/expr.proto | 5 +- native/spark-expr/src/to_json.rs | 4 - ...CometTPCDSMicroBenchmark-jdk11-results.txt | 160 ++++++++++++++++++ .../apache/comet/serde/QueryPlanSerde.scala | 59 +++++-- .../apache/comet/CometExpressionSuite.scala | 20 ++- 5 files changed, 220 insertions(+), 28 deletions(-) create mode 100644 spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt diff --git a/native/proto/src/proto/expr.proto b/native/proto/src/proto/expr.proto index 815541fcc2..367b2d0c77 100644 --- a/native/proto/src/proto/expr.proto +++ b/native/proto/src/proto/expr.proto @@ -347,7 +347,10 @@ message StringSpace { message ToJson { Expr child = 1; string timezone = 2; - // TODO options + string date_format = 3; + string timestamp_format = 4; + string timestamp_ntz_format = 5; + bool ignore_null_fields = 6; } message Hour { diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 2abd0e2a77..b9d132ac34 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -51,7 +51,6 @@ impl ToJson { impl Display for ToJson { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - // TODO options write!(f, "to_json({}, timezone={})", self.expr, self.timezone) } } @@ -59,7 +58,6 @@ impl Display for ToJson { impl PartialEq for ToJson { fn eq(&self, other: &dyn Any) -> bool { if let Some(other) = other.downcast_ref::() { - //TODO compare options self.expr.eq(&other.expr) && self.timezone.eq(&other.timezone) } else { false @@ -97,12 +95,10 @@ impl PhysicalExpr for ToJson { children: Vec>, ) -> Result> { assert!(children.len() == 1); - // TODO options Ok(Arc::new(Self::new(children[0].clone(), &self.timezone))) } fn dyn_hash(&self, state: &mut dyn Hasher) { - // TODO options let mut s = state; self.expr.hash(&mut s); self.timezone.hash(&mut s); diff --git a/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt b/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt new file mode 100644 index 0000000000..ad77b45609 --- /dev/null +++ b/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt @@ -0,0 +1,160 @@ +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +scan_decimal 1302 1322 28 221.1 4.5 1.0X +scan_decimal: Comet (Scan) 1344 1348 7 214.3 4.7 1.0X +scan_decimal: Comet (Scan, Exec) 1311 1338 38 219.7 4.6 1.0X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +add_decimals 1670 1695 36 172.4 5.8 1.0X +add_decimals: Comet (Scan) 1746 1758 17 165.0 6.1 1.0X +add_decimals: Comet (Scan, Exec) 1526 1532 9 188.8 5.3 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +add_many_decimals 20802 20884 116 13.8 72.2 1.0X +add_many_decimals: Comet (Scan) 29975 30030 77 9.6 104.1 0.7X +add_many_decimals: Comet (Scan, Exec) 8883 8958 106 32.4 30.8 2.3X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +add_many_integers 2082 2108 37 138.3 7.2 1.0X +add_many_integers: Comet (Scan) 1782 1822 57 161.6 6.2 1.2X +add_many_integers: Comet (Scan, Exec) 1961 2021 85 146.8 6.8 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +agg_high_cardinality 1143 1153 14 63.0 15.9 1.0X +agg_high_cardinality: Comet (Scan) 1145 1167 31 62.9 15.9 1.0X +agg_high_cardinality: Comet (Scan, Exec) 712 744 28 101.1 9.9 1.6X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +agg_low_cardinality 256 273 15 281.6 3.6 1.0X +agg_low_cardinality: Comet (Scan) 277 288 12 259.5 3.9 0.9X +agg_low_cardinality: Comet (Scan, Exec) 192 206 10 374.6 2.7 1.3X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +agg_sum_decimals_no_grouping 10478 10505 39 27.5 36.4 1.0X +agg_sum_decimals_no_grouping: Comet (Scan) 11123 11167 63 25.9 38.6 0.9X +agg_sum_decimals_no_grouping: Comet (Scan, Exec) 8228 8311 118 35.0 28.6 1.3X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +-------------------------------------------------------------------------------------------------------------------------------- +agg_sum_integers_no_grouping 1745 1803 81 165.0 6.1 1.0X +agg_sum_integers_no_grouping: Comet (Scan) 1900 1905 7 151.6 6.6 0.9X +agg_sum_integers_no_grouping: Comet (Scan, Exec) 1992 1992 0 144.6 6.9 0.9X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------- +case_when_column_or_null 940 959 18 306.3 3.3 1.0X +case_when_column_or_null: Comet (Scan) 960 966 6 300.1 3.3 1.0X +case_when_column_or_null: Comet (Scan, Exec) 879 905 24 327.5 3.1 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +case_when_scalar 202 213 7 355.9 2.8 1.0X +case_when_scalar: Comet (Scan) 188 201 8 383.0 2.6 1.1X +case_when_scalar: Comet (Scan, Exec) 193 212 11 373.9 2.7 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +filter_highly_selective 194 203 7 370.3 2.7 1.0X +filter_highly_selective: Comet (Scan) 184 196 6 390.3 2.6 1.1X +filter_highly_selective: Comet (Scan, Exec) 112 121 5 642.7 1.6 1.7X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +filter_less_selective 194 206 8 371.0 2.7 1.0X +filter_less_selective: Comet (Scan) 183 194 6 393.7 2.5 1.1X +filter_less_selective: Comet (Scan, Exec) 128 138 7 563.3 1.8 1.5X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +if_column_or_null 616 625 10 467.8 2.1 1.0X +if_column_or_null: Comet (Scan) 582 612 29 494.7 2.0 1.1X +if_column_or_null: Comet (Scan, Exec) 887 911 21 324.9 3.1 0.7X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +join_anti 3916 4138 313 18.4 54.4 1.0X +join_anti: Comet (Scan) 4217 4392 247 17.1 58.6 0.9X +join_anti: Comet (Scan, Exec) 3606 3838 329 20.0 50.1 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +join_condition 1692 1733 58 321.2 3.1 1.0X +join_condition: Comet (Scan) 1579 1603 34 344.1 2.9 1.1X +join_condition: Comet (Scan, Exec) 1635 1636 1 332.3 3.0 1.0X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------- +join_exploding_output 1509 1511 4 360.1 2.8 1.0X +join_exploding_output: Comet (Scan) 1320 1335 21 411.6 2.4 1.1X +join_exploding_output: Comet (Scan, Exec) 1411 1433 31 385.0 2.6 1.1X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +join_inner 212 218 7 1357.9 0.7 1.0X +join_inner: Comet (Scan) 205 210 4 1408.3 0.7 1.0X +join_inner: Comet (Scan, Exec) 314 319 6 918.3 1.1 0.7X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +join_left_outer 89857 90697 1189 3.5 283.6 1.0X +join_left_outer: Comet (Scan) 89060 91560 NaN 3.6 281.1 1.0X +join_left_outer: Comet (Scan, Exec) 88378 89515 1608 3.6 279.0 1.0X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +join_semi 9303 9384 115 7.7 129.2 1.0X +join_semi: Comet (Scan) 9187 9798 864 7.8 127.6 1.0X +join_semi: Comet (Scan, Exec) 9074 9127 74 7.9 126.0 1.0X + +OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic +AMD Ryzen 9 7950X3D 16-Core Processor +TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +rlike 91 98 15 2.2 446.9 1.0X +rlike: Comet (Scan) 88 92 3 2.3 431.4 1.0X +rlike: Comet (Scan, Exec) 61 67 8 3.3 298.5 1.5X + diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 3f81b14f05..ec577435b6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1215,23 +1215,50 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim withInfo(expr, "StructsToJson with options is not supported") None } else { - // TODO check for supported data types - // TODO check for supported timezone - exprToProto(child, input, binding) match { - case Some(p) => - val toJson = ExprOuterClass.ToJson - .newBuilder() - .setChild(p) - .setTimezone(timezoneId.getOrElse("UTC")) - .build() - Some( - ExprOuterClass.Expr - .newBuilder() - .setToJson(toJson) - .build()) + + def isSupportedType(dt: DataType): Boolean = { + dt match { + case StructType(fields) => + fields.forall(f => isSupportedType(f.dataType)) + case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType | + DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType | + DataTypes.DoubleType | DataTypes.StringType => + true + case DataTypes.DateType | DataTypes.TimestampType => + // TODO implement these types with tests for formatting options and timezone + false + case _ => false + } + } + + val isSupported = child.dataType match { + case s: StructType => + s.fields.forall(f => isSupportedType(f.dataType)) case _ => - withInfo(expr, child) - None + false + } + + if (isSupported) { + exprToProto(child, input, binding) match { + case Some(p) => + val toJson = ExprOuterClass.ToJson + .newBuilder() + .setChild(p) + .setTimezone(timezoneId.getOrElse("UTC")) + .setIgnoreNullFields(true) + .build() + Some( + ExprOuterClass.Expr + .newBuilder() + .setToJson(toJson) + .build()) + case _ => + withInfo(expr, child) + None + } + } else { + withInfo(expr, "Unsupported data type", child) + None } } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 06b976879e..bd4962e42b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1960,16 +1960,22 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( - (0 until 100).map(i => - ( - i, - if (i % 2 == 0) { "even" } - else { "odd" })), + (0 until 100).map(i => { + val str = if (i % 2 == 0) { + "even" + } else { + "odd" + } + (i.toByte, i.toShort, i, i.toLong, i * 1.2f, -i * 1.2d, str, i.toString) + }), "tbl", withDictionary = dictionaryEnabled) { - checkSparkAnswerAndOperator("SELECT to_json(named_struct('a', _1, 'b', _2)) FROM tbl") + + val fields = Range(1,8).map(n => s"'col$n', _$n").mkString(", ") + + checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") checkSparkAnswerAndOperator( - "SELECT to_json(named_struct('nested', named_struct('a', _1, 'b', _2))) FROM tbl") + s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") } } } From 08ea988d992deb3b961b5c2cad389978ef76e386 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Aug 2024 08:27:58 -0600 Subject: [PATCH 08/25] remove perf results --- ...CometTPCDSMicroBenchmark-jdk11-results.txt | 160 ------------------ 1 file changed, 160 deletions(-) delete mode 100644 spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt diff --git a/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt b/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt deleted file mode 100644 index ad77b45609..0000000000 --- a/spark/benchmarks/CometTPCDSMicroBenchmark-jdk11-results.txt +++ /dev/null @@ -1,160 +0,0 @@ -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -scan_decimal 1302 1322 28 221.1 4.5 1.0X -scan_decimal: Comet (Scan) 1344 1348 7 214.3 4.7 1.0X -scan_decimal: Comet (Scan, Exec) 1311 1338 38 219.7 4.6 1.0X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -add_decimals 1670 1695 36 172.4 5.8 1.0X -add_decimals: Comet (Scan) 1746 1758 17 165.0 6.1 1.0X -add_decimals: Comet (Scan, Exec) 1526 1532 9 188.8 5.3 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -add_many_decimals 20802 20884 116 13.8 72.2 1.0X -add_many_decimals: Comet (Scan) 29975 30030 77 9.6 104.1 0.7X -add_many_decimals: Comet (Scan, Exec) 8883 8958 106 32.4 30.8 2.3X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -add_many_integers 2082 2108 37 138.3 7.2 1.0X -add_many_integers: Comet (Scan) 1782 1822 57 161.6 6.2 1.2X -add_many_integers: Comet (Scan, Exec) 1961 2021 85 146.8 6.8 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -agg_high_cardinality 1143 1153 14 63.0 15.9 1.0X -agg_high_cardinality: Comet (Scan) 1145 1167 31 62.9 15.9 1.0X -agg_high_cardinality: Comet (Scan, Exec) 712 744 28 101.1 9.9 1.6X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -agg_low_cardinality 256 273 15 281.6 3.6 1.0X -agg_low_cardinality: Comet (Scan) 277 288 12 259.5 3.9 0.9X -agg_low_cardinality: Comet (Scan, Exec) 192 206 10 374.6 2.7 1.3X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -agg_sum_decimals_no_grouping 10478 10505 39 27.5 36.4 1.0X -agg_sum_decimals_no_grouping: Comet (Scan) 11123 11167 63 25.9 38.6 0.9X -agg_sum_decimals_no_grouping: Comet (Scan, Exec) 8228 8311 118 35.0 28.6 1.3X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative --------------------------------------------------------------------------------------------------------------------------------- -agg_sum_integers_no_grouping 1745 1803 81 165.0 6.1 1.0X -agg_sum_integers_no_grouping: Comet (Scan) 1900 1905 7 151.6 6.6 0.9X -agg_sum_integers_no_grouping: Comet (Scan, Exec) 1992 1992 0 144.6 6.9 0.9X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ----------------------------------------------------------------------------------------------------------------------------- -case_when_column_or_null 940 959 18 306.3 3.3 1.0X -case_when_column_or_null: Comet (Scan) 960 966 6 300.1 3.3 1.0X -case_when_column_or_null: Comet (Scan, Exec) 879 905 24 327.5 3.1 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -case_when_scalar 202 213 7 355.9 2.8 1.0X -case_when_scalar: Comet (Scan) 188 201 8 383.0 2.6 1.1X -case_when_scalar: Comet (Scan, Exec) 193 212 11 373.9 2.7 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ---------------------------------------------------------------------------------------------------------------------------- -filter_highly_selective 194 203 7 370.3 2.7 1.0X -filter_highly_selective: Comet (Scan) 184 196 6 390.3 2.6 1.1X -filter_highly_selective: Comet (Scan, Exec) 112 121 5 642.7 1.6 1.7X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -filter_less_selective 194 206 8 371.0 2.7 1.0X -filter_less_selective: Comet (Scan) 183 194 6 393.7 2.5 1.1X -filter_less_selective: Comet (Scan, Exec) 128 138 7 563.3 1.8 1.5X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -if_column_or_null 616 625 10 467.8 2.1 1.0X -if_column_or_null: Comet (Scan) 582 612 29 494.7 2.0 1.1X -if_column_or_null: Comet (Scan, Exec) 887 911 21 324.9 3.1 0.7X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -join_anti 3916 4138 313 18.4 54.4 1.0X -join_anti: Comet (Scan) 4217 4392 247 17.1 58.6 0.9X -join_anti: Comet (Scan, Exec) 3606 3838 329 20.0 50.1 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -join_condition 1692 1733 58 321.2 3.1 1.0X -join_condition: Comet (Scan) 1579 1603 34 344.1 2.9 1.1X -join_condition: Comet (Scan, Exec) 1635 1636 1 332.3 3.0 1.0X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative -------------------------------------------------------------------------------------------------------------------------- -join_exploding_output 1509 1511 4 360.1 2.8 1.0X -join_exploding_output: Comet (Scan) 1320 1335 21 411.6 2.4 1.1X -join_exploding_output: Comet (Scan, Exec) 1411 1433 31 385.0 2.6 1.1X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -join_inner 212 218 7 1357.9 0.7 1.0X -join_inner: Comet (Scan) 205 210 4 1408.3 0.7 1.0X -join_inner: Comet (Scan, Exec) 314 319 6 918.3 1.1 0.7X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -join_left_outer 89857 90697 1189 3.5 283.6 1.0X -join_left_outer: Comet (Scan) 89060 91560 NaN 3.6 281.1 1.0X -join_left_outer: Comet (Scan, Exec) 88378 89515 1608 3.6 279.0 1.0X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -join_semi 9303 9384 115 7.7 129.2 1.0X -join_semi: Comet (Scan) 9187 9798 864 7.8 127.6 1.0X -join_semi: Comet (Scan, Exec) 9074 9127 74 7.9 126.0 1.0X - -OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-44-generic -AMD Ryzen 9 7950X3D 16-Core Processor -TPCDS Micro Benchmarks: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------- -rlike 91 98 15 2.2 446.9 1.0X -rlike: Comet (Scan) 88 92 3 2.3 431.4 1.0X -rlike: Comet (Scan, Exec) 61 67 8 3.3 298.5 1.5X - From 05bbc5d63946ee78fcc595599abeb0ac4130e8c0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Aug 2024 08:46:52 -0600 Subject: [PATCH 09/25] update user guide --- docs/source/user-guide/expressions.md | 102 +++++++++--------- .../apache/comet/CometExpressionSuite.scala | 2 +- 2 files changed, 55 insertions(+), 49 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index e44f673161..0bf25bf629 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -66,32 +66,32 @@ The following Spark expressions are currently available. Any known compatibility ## String Functions -| Expression | Notes | -| --------------- | ----------------------------------------------------------------------------------------------------------- | -| Ascii | | -| BitLength | | -| Chr | | -| ConcatWs | | -| Contains | | -| EndsWith | | -| InitCap | | -| Instr | | -| Length | | -| Like | | -| Lower | | -| OctetLength | | -| Repeat | Negative argument for number of times to repeat causes exception | -| Replace | | -| Reverse | | -| StartsWith | | -| StringSpace | | -| StringTrim | | -| StringTrimBoth | | -| StringTrimLeft | | -| StringTrimRight | | -| Substring | | -| Translate | | -| Upper | | +| Expression | Notes | +| --------------- | ---------------------------------------------------------------- | +| Ascii | | +| BitLength | | +| Chr | | +| ConcatWs | | +| Contains | | +| EndsWith | | +| InitCap | | +| Instr | | +| Length | | +| Like | | +| Lower | | +| OctetLength | | +| Repeat | Negative argument for number of times to repeat causes exception | +| Replace | | +| Reverse | | +| StartsWith | | +| StringSpace | | +| StringTrim | | +| StringTrimBoth | | +| StringTrimLeft | | +| StringTrimRight | | +| Substring | | +| Translate | | +| Upper | | ## Date/Time Functions @@ -108,27 +108,27 @@ The following Spark expressions are currently available. Any known compatibility ## Math Expressions -| Expression | Notes | -| ---------- | ------------------------------------------------------------------- | -| Abs | | -| Acos | | -| Asin | | -| Atan | | -| Atan2 | | -| Ceil | | -| Cos | | -| Exp | | -| Floor | | -| IsNaN | | -| Log | | -| Log2 | | -| Log10 | | -| Pow | | -| Round | | -| Signum | Signum does not differentiate between `0.0` and `-0.0` | -| Sin | | -| Sqrt | | -| Tan | | +| Expression | Notes | +| ---------- | ------------------------------------------------------ | +| Abs | | +| Acos | | +| Asin | | +| Atan | | +| Atan2 | | +| Ceil | | +| Cos | | +| Exp | | +| Floor | | +| IsNaN | | +| Log | | +| Log2 | | +| Log10 | | +| Pow | | +| Round | | +| Signum | Signum does not differentiate between `0.0` and `-0.0` | +| Sin | | +| Sqrt | | +| Tan | | ## Hashing Functions @@ -182,6 +182,13 @@ The following Spark expressions are currently available. Any known compatibility | VariancePop | | | VarianceSamp | | +## Complex Types + +| Expression | Notes | +| ----------------- | ----- | +| CreateNamedStruct | | +| StructsToJson | | + ## Other | Expression | Notes | @@ -191,4 +198,3 @@ The following Spark expressions are currently available. Any known compatibility | ScalarSubquery | | | Coalesce | | | NormalizeNaNAndZero | | -| CreateNamedStruct | | diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bd4962e42b..4c7172276b 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1971,7 +1971,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { "tbl", withDictionary = dictionaryEnabled) { - val fields = Range(1,8).map(n => s"'col$n', _$n").mkString(", ") + val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ") checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") checkSparkAnswerAndOperator( From 525d26009974a570458094976e349fd5be51784d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Aug 2024 09:12:02 -0600 Subject: [PATCH 10/25] add microbenchmark --- .../tpcds-micro-benchmarks/to_json.sql | 20 +++++++++++++++++++ .../benchmark/CometTPCDSMicroBenchmark.scala | 3 ++- 2 files changed, 22 insertions(+), 1 deletion(-) create mode 100644 spark/src/test/resources/tpcds-micro-benchmarks/to_json.sql diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/to_json.sql b/spark/src/test/resources/tpcds-micro-benchmarks/to_json.sql new file mode 100644 index 0000000000..f78af92d64 --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/to_json.sql @@ -0,0 +1,20 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- This is not part of TPC-DS but runs on TPC-DS data + +SELECT to_json(named_struct('id', i_item_sk, 'desc', i_item_desc, 'color', i_color)) FROM item; \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index aa0c911553..9dd50fc46f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -73,7 +73,8 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "join_inner", "join_left_outer", "join_semi", - "rlike") + "rlike", + "to_json") override def runQueries( queryLocation: String, From 5c2f5513b894ff9c9d0a8cde2db299bb1a0affc2 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 11 Aug 2024 09:15:14 -0600 Subject: [PATCH 11/25] remove comment --- native/spark-expr/src/to_json.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index b9d132ac34..d07ac7d57d 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -181,7 +181,6 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { } } json.push('}'); - // TODO how to pass slice here instead of cloning the string? builder.append_value(json.clone()); } } From 6976d1f272a2c145466c27c7f594cc5ea1fa6f1f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Aug 2024 07:33:25 -0600 Subject: [PATCH 12/25] update docs --- docs/source/user-guide/expressions.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index 0bf25bf629..aa86402e55 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -83,6 +83,7 @@ The following Spark expressions are currently available. Any known compatibility | Repeat | Negative argument for number of times to repeat causes exception | | Replace | | | Reverse | | +| RLike | Regular expression version of LIKE | | StartsWith | | | StringSpace | | | StringTrim | | @@ -184,10 +185,11 @@ The following Spark expressions are currently available. Any known compatibility ## Complex Types -| Expression | Notes | -| ----------------- | ----- | -| CreateNamedStruct | | -| StructsToJson | | +| Expression | Notes | +| ----------------- | --------------------------------- | +| CreateNamedStruct | Create a struct | +| GetElementAt | Access a field in a struct | +| StructsToJson | Convert a struct to a JSON string | ## Other From 02256425a60e612e5fd58b666c0cda252e9fca9f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 12 Aug 2024 07:36:07 -0600 Subject: [PATCH 13/25] reduce size of diff --- docs/source/user-guide/expressions.md | 95 +++++++++++++-------------- 1 file changed, 47 insertions(+), 48 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index aa86402e55..8724312a72 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -66,33 +66,32 @@ The following Spark expressions are currently available. Any known compatibility ## String Functions -| Expression | Notes | -| --------------- | ---------------------------------------------------------------- | -| Ascii | | -| BitLength | | -| Chr | | -| ConcatWs | | -| Contains | | -| EndsWith | | -| InitCap | | -| Instr | | -| Length | | -| Like | | -| Lower | | -| OctetLength | | -| Repeat | Negative argument for number of times to repeat causes exception | -| Replace | | -| Reverse | | -| RLike | Regular expression version of LIKE | -| StartsWith | | -| StringSpace | | -| StringTrim | | -| StringTrimBoth | | -| StringTrimLeft | | -| StringTrimRight | | -| Substring | | -| Translate | | -| Upper | | +| Expression | Notes | +| --------------- | ----------------------------------------------------------------------------------------------------------- | +| Ascii | | +| BitLength | | +| Chr | | +| ConcatWs | | +| Contains | | +| EndsWith | | +| InitCap | | +| Instr | | +| Length | | +| Like | | +| Lower | | +| OctetLength | | +| Repeat | Negative argument for number of times to repeat causes exception | +| Replace | | +| Reverse | | +| StartsWith | | +| StringSpace | | +| StringTrim | | +| StringTrimBoth | | +| StringTrimLeft | | +| StringTrimRight | | +| Substring | | +| Translate | | +| Upper | | ## Date/Time Functions @@ -109,27 +108,27 @@ The following Spark expressions are currently available. Any known compatibility ## Math Expressions -| Expression | Notes | -| ---------- | ------------------------------------------------------ | -| Abs | | -| Acos | | -| Asin | | -| Atan | | -| Atan2 | | -| Ceil | | -| Cos | | -| Exp | | -| Floor | | -| IsNaN | | -| Log | | -| Log2 | | -| Log10 | | -| Pow | | -| Round | | -| Signum | Signum does not differentiate between `0.0` and `-0.0` | -| Sin | | -| Sqrt | | -| Tan | | +| Expression | Notes | +| ---------- | ------------------------------------------------------------------- | +| Abs | | +| Acos | | +| Asin | | +| Atan | | +| Atan2 | | +| Ceil | | +| Cos | | +| Exp | | +| Floor | | +| IsNaN | | +| Log | | +| Log2 | | +| Log10 | | +| Pow | | +| Round | | +| Signum | Signum does not differentiate between `0.0` and `-0.0` | +| Sin | | +| Sqrt | | +| Tan | | ## Hashing Functions From d2f55e29c8b66d94380bdff030a292565ddf15f3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 10:15:53 -0600 Subject: [PATCH 14/25] add failing test for quotes in field names and values --- .../apache/comet/CometExpressionSuite.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 1d9eb89291..fce175c7c3 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1976,6 +1976,33 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("to_json escaped quotes") { + val gen = new DataGenerator(new Random(42)) + val chars = "\\'\"abc" + Seq(true, false).foreach { dictionaryEnabled => + withParquetTable( + (0 until 100).map(_ => { + val str1 = gen.generateString(chars, 8) + val str2 = gen.generateString(chars, 8) + (str1, str2) + }), + "tbl", + withDictionary = dictionaryEnabled) { + + val fields = Range(1, 3) + .map(n => { + val columnName = s"""column "$n"""" + s"'$columnName', _$n" + }) + .mkString(", ") + + checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") + checkSparkAnswerAndOperator( + s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") + } + } + } + test("struct and named_struct with dictionary") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( From d327a68684d9731f160e4dafd99d5d14e98cc253 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 11:00:54 -0600 Subject: [PATCH 15/25] test passes --- native/spark-expr/src/to_json.rs | 34 +++++++++++++++++++++++++------- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index d07ac7d57d..b92b7818bc 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -121,11 +121,30 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result String { + let mut escaped_string = String::with_capacity(input.len()); + let mut chars = input.chars().peekable(); + while let Some(c) = chars.next() { + match c { + '\"' | '\\' => { + escaped_string.push('\\'); + escaped_string.push(c); + } + _ => escaped_string.push(c), + } + } + escaped_string +} + fn struct_to_json(array: &StructArray, timezone: &str) -> Result { - // get field names - let field_names: Vec = array.fields().iter().map(|f| f.name().clone()).collect(); + // get field names and escape any quotes + let field_names: Vec = array + .fields() + .iter() + .map(|f| escape_quotes(f.name().as_str())) + .collect(); // determine which fields need to have their values quoted - let quotes_needed: Vec = array + let is_string: Vec = array .fields() .iter() .map(|f| match f.data_type() { @@ -170,12 +189,13 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { json.push_str(&field_names[col_index]); json.push_str("\":"); // value - if quotes_needed[col_index] { + let string_value = string_arrays[col_index].value(row_index); + if is_string[col_index] { json.push('"'); - } - json.push_str(string_arrays[col_index].value(row_index)); - if quotes_needed[col_index] { + json.push_str(&escape_quotes(string_value)); json.push('"'); + } else { + json.push_str(string_value); } any_fields_written = true; } From d84f2947085fb844ca50d7117bf335d7d7ff4a22 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 11:05:23 -0600 Subject: [PATCH 16/25] clippy --- native/spark-expr/src/to_json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index b92b7818bc..9cee954486 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -95,7 +95,7 @@ impl PhysicalExpr for ToJson { children: Vec>, ) -> Result> { assert!(children.len() == 1); - Ok(Arc::new(Self::new(children[0].clone(), &self.timezone))) + Ok(Arc::new(Self::new(Arc::clone(&children[0]), &self.timezone))) } fn dyn_hash(&self, state: &mut dyn Hasher) { From d1b6b24f991902eb34997b1eb9c6b6d9f3d65bc9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 11:07:52 -0600 Subject: [PATCH 17/25] revert a docs change --- docs/source/user-guide/expressions.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/source/user-guide/expressions.md b/docs/source/user-guide/expressions.md index a17bf04de3..c2b372690d 100644 --- a/docs/source/user-guide/expressions.md +++ b/docs/source/user-guide/expressions.md @@ -184,11 +184,11 @@ The following Spark expressions are currently available. Any known compatibility ## Complex Types -| Expression | Notes | -| ----------------- | --------------------------------- | -| CreateNamedStruct | Create a struct | -| GetElementAt | Access a field in a struct | -| StructsToJson | Convert a struct to a JSON string | +| Expression | Notes | +| ----------------- | ----- | +| CreateNamedStruct | | +| GetElementAt | | +| StructsToJson | | ## Other From c34b69a03168d6b5bf8bea2f3ef57e651bc937d8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 11:08:50 -0600 Subject: [PATCH 18/25] Update native/spark-expr/src/to_json.rs Co-authored-by: Emil Ejbyfeldt --- native/spark-expr/src/to_json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 9cee954486..e2dba3aab9 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -201,7 +201,7 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { } } json.push('}'); - builder.append_value(json.clone()); + builder.append_value(&json); } } Ok(Arc::new(builder.finish())) From 8e6ca9f32de878142ac8446b151abd4863f536e0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sun, 25 Aug 2024 11:45:49 -0600 Subject: [PATCH 19/25] address feedback --- native/spark-expr/src/to_json.rs | 13 ++++++++----- .../org/apache/comet/serde/QueryPlanSerde.scala | 8 ++++++++ .../org/apache/comet/CometExpressionSuite.scala | 4 ++-- 3 files changed, 18 insertions(+), 7 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 9cee954486..99d7ec4b0e 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -95,7 +95,10 @@ impl PhysicalExpr for ToJson { children: Vec>, ) -> Result> { assert!(children.len() == 1); - Ok(Arc::new(Self::new(Arc::clone(&children[0]), &self.timezone))) + Ok(Arc::new(Self::new( + Arc::clone(&children[0]), + &self.timezone, + ))) } fn dyn_hash(&self, state: &mut dyn Hasher) { @@ -121,7 +124,7 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result String { +fn escape_string(input: &str) -> String { let mut escaped_string = String::with_capacity(input.len()); let mut chars = input.chars().peekable(); while let Some(c) = chars.next() { @@ -141,7 +144,7 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { let field_names: Vec = array .fields() .iter() - .map(|f| escape_quotes(f.name().as_str())) + .map(|f| escape_string(f.name().as_str())) .collect(); // determine which fields need to have their values quoted let is_string: Vec = array @@ -192,7 +195,7 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { let string_value = string_arrays[col_index].value(row_index); if is_string[col_index] { json.push('"'); - json.push_str(&escape_quotes(string_value)); + json.push_str(&escape_string(string_value)); json.push('"'); } else { json.push_str(string_value); @@ -201,7 +204,7 @@ fn struct_to_json(array: &StructArray, timezone: &str) -> Result { } } json.push('}'); - builder.append_value(json.clone()); + builder.append_value(&json); } } Ok(Arc::new(builder.finish())) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 57c8485bc3..8c92157e0f 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -1275,6 +1275,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim case DataTypes.DateType | DataTypes.TimestampType => // TODO implement these types with tests for formatting options and timezone false + case _: MapType | _: ArrayType => + // Spark supports map and array in StructsToJson but this is not yet + // implemented in Comet + false case _ => false } } @@ -1282,6 +1286,10 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim val isSupported = child.dataType match { case s: StructType => s.fields.forall(f => isSupportedType(f.dataType)) + case _: MapType | _: ArrayType => + // Spark supports map and array in StructsToJson but this is not yet + // implemented in Comet + false case _ => false } diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fce175c7c3..21edc63d75 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1976,9 +1976,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } - test("to_json escaped quotes") { + test("to_json escaping of field names and string values") { val gen = new DataGenerator(new Random(42)) - val chars = "\\'\"abc" + val chars = "\\'\"abc\t" Seq(true, false).foreach { dictionaryEnabled => withParquetTable( (0 until 100).map(_ => { From e76501d122ca4d331431dd8421b41b2c1bab8057 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Aug 2024 08:56:25 -0600 Subject: [PATCH 20/25] support tabs --- native/spark-expr/src/to_json.rs | 17 +++++++++++++++-- .../org/apache/comet/CometExpressionSuite.scala | 12 ++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 99d7ec4b0e..1fdf956c71 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -127,13 +127,26 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result String { let mut escaped_string = String::with_capacity(input.len()); let mut chars = input.chars().peekable(); + let mut is_escaped = false; while let Some(c) = chars.next() { match c { - '\"' | '\\' => { + '\"' | '\\' if !is_escaped => { escaped_string.push('\\'); escaped_string.push(c); + is_escaped = false; + } + '\t' => { + escaped_string.push_str("\\t"); + is_escaped = false; + } + '\\' => { + escaped_string.push('\\'); + is_escaped = true; + } + _ => { + escaped_string.push(c); + is_escaped = false; } - _ => escaped_string.push(c), } } escaped_string diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index 21edc63d75..d6e7b53120 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1969,9 +1969,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ") - checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") + checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl ORDER BY 1") checkSparkAnswerAndOperator( - s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") + s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl ORDER BY 1") } } } @@ -1981,10 +1981,10 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val chars = "\\'\"abc\t" Seq(true, false).foreach { dictionaryEnabled => withParquetTable( - (0 until 100).map(_ => { + (0 until 100).map(i => { val str1 = gen.generateString(chars, 8) val str2 = gen.generateString(chars, 8) - (str1, str2) + (i.toString, str1, str2) }), "tbl", withDictionary = dictionaryEnabled) { @@ -1996,9 +1996,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { }) .mkString(", ") - checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") checkSparkAnswerAndOperator( - s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") + """SELECT 'column "1"' x, """ + + s"to_json(named_struct($fields)) FROM tbl ORDER BY x") } } } From 7402e560348e72fd1a0261d000c3e036953c5911 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Aug 2024 09:14:31 -0600 Subject: [PATCH 21/25] newlines --- native/spark-expr/src/to_json.rs | 18 +++++++++++++++++- .../apache/comet/CometExpressionSuite.scala | 2 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 1fdf956c71..6b7a62a103 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -136,7 +136,23 @@ fn escape_string(input: &str) -> String { is_escaped = false; } '\t' => { - escaped_string.push_str("\\t"); + escaped_string.push('\\'); + escaped_string.push('t'); + is_escaped = false; + } + '\r' => { + escaped_string.push('\\'); + escaped_string.push('r'); + is_escaped = false; + } + '\n' => { + escaped_string.push('\\'); + escaped_string.push('n'); + is_escaped = false; + } + '\x0C' => { + escaped_string.push('\\'); + escaped_string.push('f'); is_escaped = false; } '\\' => { diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index d6e7b53120..da2ca226d4 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1978,7 +1978,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json escaping of field names and string values") { val gen = new DataGenerator(new Random(42)) - val chars = "\\'\"abc\t" + val chars = "\\'\"abc\t\r\n\f" Seq(true, false).foreach { dictionaryEnabled => withParquetTable( (0 until 100).map(i => { From e17506de21970255e5dda269b63794620924cb23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Aug 2024 09:23:12 -0600 Subject: [PATCH 22/25] backspace --- native/spark-expr/src/to_json.rs | 5 ++++ .../apache/comet/CometExpressionSuite.scala | 25 ++++++++++++++++++- 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 6b7a62a103..0ba5af9d93 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -155,6 +155,11 @@ fn escape_string(input: &str) -> String { escaped_string.push('f'); is_escaped = false; } + '\x08' => { + escaped_string.push('\\'); + escaped_string.push('b'); + is_escaped = false; + } '\\' => { escaped_string.push('\\'); is_escaped = true; diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index da2ca226d4..eec63f729c 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1978,7 +1978,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { test("to_json escaping of field names and string values") { val gen = new DataGenerator(new Random(42)) - val chars = "\\'\"abc\t\r\n\f" + val chars = "\\'\"abc\t\r\n\f\b" Seq(true, false).foreach { dictionaryEnabled => withParquetTable( (0 until 100).map(i => { @@ -2003,6 +2003,29 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("to_json unicode") { + Seq(true, false).foreach { dictionaryEnabled => + withParquetTable( + (0 until 100).map(i => { + (i.toString, "\uD83E\uDD11", "\u018F") + }), + "tbl", + withDictionary = dictionaryEnabled) { + + val fields = Range(1, 3) + .map(n => { + val columnName = s"""column "$n"""" + s"'$columnName', _$n" + }) + .mkString(", ") + + checkSparkAnswerAndOperator( + """SELECT 'column "1"' x, """ + + s"to_json(named_struct($fields)) FROM tbl ORDER BY x") + } + } + } + test("struct and named_struct with dictionary") { Seq(true, false).foreach { dictionaryEnabled => withParquetTable( From a1a7f216a3b0dee33527575647241d0810930c3d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 26 Aug 2024 09:27:48 -0600 Subject: [PATCH 23/25] clippy --- native/spark-expr/src/to_json.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 0ba5af9d93..1e0ad37e37 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -126,9 +126,8 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result String { let mut escaped_string = String::with_capacity(input.len()); - let mut chars = input.chars().peekable(); let mut is_escaped = false; - while let Some(c) = chars.next() { + for c in input.chars() { match c { '\"' | '\\' if !is_escaped => { escaped_string.push('\\'); From 5b23f1f766a0867c04887e9a9515fd46e09ac292 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Aug 2024 10:38:28 -0600 Subject: [PATCH 24/25] fix test regression --- .../test/scala/org/apache/comet/CometExpressionSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index eec63f729c..c86cfa84ec 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1969,9 +1969,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { val fields = Range(1, 8).map(n => s"'col$n', _$n").mkString(", ") - checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl ORDER BY 1") + checkSparkAnswerAndOperator(s"SELECT to_json(named_struct($fields)) FROM tbl") checkSparkAnswerAndOperator( - s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl ORDER BY 1") + s"SELECT to_json(named_struct('nested', named_struct($fields))) FROM tbl") } } } From 525c9eb3bcd30beec731c8e6c813f6101bccdc5e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 28 Aug 2024 13:30:27 -0600 Subject: [PATCH 25/25] cargo fmt --- native/spark-expr/src/to_json.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/native/spark-expr/src/to_json.rs b/native/spark-expr/src/to_json.rs index 050289f54c..2b9a2c5407 100644 --- a/native/spark-expr/src/to_json.rs +++ b/native/spark-expr/src/to_json.rs @@ -119,7 +119,7 @@ fn array_to_json_string(arr: &Arc, timezone: &str) -> Result