From 28b8cb6a9940547561722d7bbbf53613278e080e Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 08:51:27 -0600 Subject: [PATCH 01/11] Add new config --- .../main/scala/org/apache/comet/CometConf.scala | 10 ++++++++++ .../comet/parquet/CometParquetFileFormat.scala | 3 ++- .../CometParquetPartitionReaderFactory.scala | 3 ++- .../org/apache/comet/serde/QueryPlanSerde.scala | 15 ++++++++++++--- 4 files changed, 26 insertions(+), 5 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 555beb5cbb..e0f3d4d24f 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -106,6 +106,16 @@ object CometConf extends ShimCometConf { .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET) .toLowerCase(Locale.ROOT)) + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.parquet.respectFilterPushdown") + .doc( + "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + + "respected when running the Spark SQL test suite but the default setting " + + "results in poor performance in Comet when using the new native scans, so we want " + + "to disable by default") + .booleanConf + .createWithDefault(false) + val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.parquet.read.parallel.io.enabled") .doc( diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index 871ac2704d..bcfbd462fe 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -99,7 +99,8 @@ class CometParquetFileFormat(scanImpl: String) val optionsMap = CaseInsensitiveMap[String](options) val parquetOptions = new ParquetOptions(optionsMap, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead - val parquetFilterPushDown = sqlConf.parquetFilterPushDown + val parquetFilterPushDown = sqlConf.parquetFilterPushDown && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(sqlConf) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala index 69cffdd15d..efe30b5324 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala @@ -67,7 +67,8 @@ case class CometParquetPartitionReaderFactory( private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead - private val parquetFilterPushDown = sqlConf.parquetFilterPushDown + private val parquetFilterPushDown = sqlConf.parquetFilterPushDown && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(sqlConf) // Comet specific configurations private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 9671ef9d75..93f8000803 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -22,6 +22,7 @@ package org.apache.comet.serde import java.util.Locale import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer import scala.math.min import org.apache.spark.internal.Logging @@ -2223,9 +2224,17 @@ object QueryPlanSerde extends Logging with CometExprShim { // Sink operators don't have children result.clearChildren() - if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED)) { - // TODO remove flatMap and add error handling for unsupported data filters - val dataFilters = scan.dataFilters.flatMap(exprToProto(_, scan.output)) + if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) && + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(conf)) { + + val dataFilters = new ListBuffer[Expr]() + for (filter <- scan.dataFilters) { + exprToProto(filter, scan.output) match { + case Some(proto) => dataFilters += proto + case _ => + logWarning(s"Unsupported data filter $filter") + } + } nativeScanBuilder.addAllDataFilters(dataFilters.asJava) } From eeca0339efc8a9e11a27fb7d105c074ec1702530 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 09:33:01 -0600 Subject: [PATCH 02/11] fix --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- docs/source/user-guide/configs.md | 1 + .../org/apache/comet/parquet/CometParquetFileFormat.scala | 2 +- .../comet/parquet/CometParquetPartitionReaderFactory.scala | 2 +- .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 2 +- .../scala/org/apache/comet/parquet/ParquetReadSuite.scala | 4 ---- spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala | 1 + 7 files changed, 6 insertions(+), 8 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index e0f3d4d24f..4d172dc62d 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -106,7 +106,7 @@ object CometConf extends ShimCometConf { .getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET) .toLowerCase(Locale.ROOT)) - val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED: ConfigEntry[Boolean] = + val COMET_RESPECT_PARQUET_FILTER_PUSHDOWN: ConfigEntry[Boolean] = conf("spark.comet.parquet.respectFilterPushdown") .doc( "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 96bd7cec18..9abd6d1ddb 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,6 +84,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | +| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, so we want to disable by default | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala index bcfbd462fe..5c2de2a6b3 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetFileFormat.scala @@ -100,7 +100,7 @@ class CometParquetFileFormat(scanImpl: String) val parquetOptions = new ParquetOptions(optionsMap, sqlConf) val datetimeRebaseModeInRead = parquetOptions.datetimeRebaseModeInRead val parquetFilterPushDown = sqlConf.parquetFilterPushDown && - CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(sqlConf) + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf) // Comet specific configurations val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala index efe30b5324..a5aa0872cd 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala @@ -68,7 +68,7 @@ case class CometParquetPartitionReaderFactory( private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead private val parquetFilterPushDown = sqlConf.parquetFilterPushDown && - CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(sqlConf) + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf) // Comet specific configurations private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf) diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index 93f8000803..f263c2abea 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2225,7 +2225,7 @@ object QueryPlanSerde extends Logging with CometExprShim { result.clearChildren() if (conf.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED) && - CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN_ENABLED.get(conf)) { + CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(conf)) { val dataFilters = new ListBuffer[Expr]() for (filter <- scan.dataFilters) { diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 4801678a46..66647334f1 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -1902,10 +1902,6 @@ class ParquetReadV1Suite extends ParquetReadSuite with AdaptiveSparkPlanHelper { withSQLConf( CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanMode, SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> pushDown.toString) { - if (scanMode == CometConf.SCAN_NATIVE_DATAFUSION && !pushDown) { - // FIXME: native_datafusion always pushdown data filters - break() - } Seq( ("_1 = true", Math.ceil(rows.toDouble / 2)), // Boolean ("_2 = 1", Math.ceil(rows.toDouble / 256)), // Byte diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index 05c46e307c..a2663cf0b9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -78,6 +78,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") + conf.set(CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true") // set the scan impl to SCAN_NATIVE_COMET because many tests are implemented From 18cfcec04d8fa995c003a3b224586ce86b8aca23 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 11:31:28 -0600 Subject: [PATCH 03/11] format --- .../test/scala/org/apache/comet/parquet/ParquetReadSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala index 66647334f1..581c606115 100644 --- a/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala +++ b/spark/src/test/scala/org/apache/comet/parquet/ParquetReadSuite.scala @@ -26,7 +26,7 @@ import java.time.{ZoneId, ZoneOffset} import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag -import scala.util.control.Breaks.{break, breakable} +import scala.util.control.Breaks.breakable import org.scalactic.source.Position import org.scalatest.Tag From 349ced533eb3221372bdc8c38289c25368794850 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 13:29:22 -0600 Subject: [PATCH 04/11] diffs --- dev/diffs/3.4.3.diff | 5 +++-- dev/diffs/3.5.6.diff | 5 +++-- dev/diffs/4.0.0-preview1.diff | 5 +++-- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff index 617d6a4826..63bab8c82e 100644 --- a/dev/diffs/3.4.3.diff +++ b/dev/diffs/3.4.3.diff @@ -2798,10 +2798,10 @@ index dd55fcfe42c..a1d390c93d0 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2810,6 +2810,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff index 618ea6a13b..e83e7ed719 100644 --- a/dev/diffs/3.5.6.diff +++ b/dev/diffs/3.5.6.diff @@ -2770,10 +2770,10 @@ index e937173a590..ca06132102d 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -2782,6 +2782,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff index 0e9ee5dc4f..bef203d843 100644 --- a/dev/diffs/4.0.0-preview1.diff +++ b/dev/diffs/4.0.0-preview1.diff @@ -3009,10 +3009,10 @@ index 5fbf379644f..d0575e1df69 100644 spark.internalCreateDataFrame(withoutFilters.execute(), schema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -index ed2e309fa07..71ba6533c9d 100644 +index ed2e309fa07..a1fb4abe681 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala -@@ -74,6 +74,31 @@ trait SharedSparkSessionBase +@@ -74,6 +74,32 @@ trait SharedSparkSessionBase // this rule may potentially block testing of other optimization rules such as // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) @@ -3021,6 +3021,7 @@ index ed2e309fa07..71ba6533c9d 100644 + conf + .set("spark.sql.extensions", "org.apache.comet.CometSparkSessionExtensions") + .set("spark.comet.enabled", "true") ++ .set("spark.comet.parquet.respectFilterPushdown", "true") + + if (!isCometScanOnly) { + conf From 443137df596477eb3cee86706ab5d5d1862bae76 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 13:44:44 -0600 Subject: [PATCH 05/11] Clippy fixes for Rust 1.88 --- native/core/benches/bit_util.rs | 14 ++-- native/core/src/common/bit.rs | 24 +++---- native/core/src/common/buffer.rs | 6 +- native/core/src/errors.rs | 10 +-- .../core/src/execution/memory_pools/config.rs | 6 +- .../src/execution/memory_pools/fair_pool.rs | 2 +- .../execution/memory_pools/unified_pool.rs | 4 +- native/core/src/execution/operators/expand.rs | 2 +- native/core/src/execution/operators/scan.rs | 7 +- native/core/src/execution/planner.rs | 41 +++++------- native/core/src/execution/shuffle/codec.rs | 4 +- native/core/src/execution/shuffle/list.rs | 7 +- native/core/src/execution/shuffle/map.rs | 7 +- native/core/src/execution/shuffle/row.rs | 4 +- .../src/execution/shuffle/shuffle_writer.rs | 15 ++--- native/core/src/execution/tracing.rs | 2 +- .../src/execution/util/spark_bloom_filter.rs | 3 +- native/core/src/jvm_bridge/mod.rs | 6 +- native/core/src/lib.rs | 2 +- native/core/src/parquet/mutable_vector.rs | 2 +- native/core/src/parquet/objectstore/s3.rs | 67 ++++++++----------- native/core/src/parquet/read/column.rs | 15 ++--- native/core/src/parquet/read/values.rs | 12 ++-- native/core/src/parquet/util/jni.rs | 8 +-- .../src/parquet/util/test_common/page_util.rs | 2 +- .../src/array_funcs/array_insert.rs | 9 +-- .../array_funcs/get_array_struct_fields.rs | 12 ++-- .../src/array_funcs/list_extract.rs | 9 +-- .../src/datetime_funcs/date_arithmetic.rs | 6 +- native/spark-expr/src/kernels/strings.rs | 5 +- native/spark-expr/src/kernels/temporal.rs | 12 ++-- native/spark-expr/src/math_funcs/ceil.rs | 3 +- native/spark-expr/src/math_funcs/floor.rs | 3 +- native/spark-expr/src/math_funcs/hex.rs | 2 +- .../src/math_funcs/internal/checkoverflow.rs | 6 +- .../src/math_funcs/internal/normalize_nan.rs | 2 +- .../src/nondetermenistic_funcs/rand.rs | 3 +- .../spark-expr/src/predicate_funcs/is_nan.rs | 3 +- .../spark-expr/src/predicate_funcs/rlike.rs | 2 +- .../src/struct_funcs/get_struct_field.rs | 6 +- native/spark-expr/src/timezone.rs | 5 +- 41 files changed, 147 insertions(+), 213 deletions(-) diff --git a/native/core/benches/bit_util.rs b/native/core/benches/bit_util.rs index afffc0d2a6..10d7060134 100644 --- a/native/core/benches/bit_util.rs +++ b/native/core/benches/bit_util.rs @@ -92,7 +92,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=size_of::()).step_by(3) { let x = num_bytes; group.bench_with_input( - BenchmarkId::new("get_aligned", format!("u8_num_bytes_{}", x)), + BenchmarkId::new("get_aligned", format!("u8_num_bytes_{x}")), &x, |b, &x| { let mut reader: BitReader = BitReader::new_all(buffer.slice(0)); @@ -103,7 +103,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=size_of::()).step_by(3) { let x = num_bytes; group.bench_with_input( - BenchmarkId::new("get_aligned", format!("u32_num_bytes_{}", x)), + BenchmarkId::new("get_aligned", format!("u32_num_bytes_{x}")), &x, |b, &x| { let mut reader: BitReader = BitReader::new_all(buffer.slice(0)); @@ -114,7 +114,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=size_of::()).step_by(3) { let x = num_bytes; group.bench_with_input( - BenchmarkId::new("get_aligned", format!("i32_num_bytes_{}", x)), + BenchmarkId::new("get_aligned", format!("i32_num_bytes_{x}")), &x, |b, &x| { let mut reader: BitReader = BitReader::new_all(buffer.slice(0)); @@ -127,7 +127,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=size_of::()).step_by(3) { let x = num_bytes * 8; group.bench_with_input( - BenchmarkId::new("get_value", format!("i32_num_bits_{}", x)), + BenchmarkId::new("get_value", format!("i32_num_bits_{x}")), &x, |b, &x| { let mut reader: BitReader = BitReader::new_all(buffer.slice(0)); @@ -140,7 +140,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=8).step_by(7) { let x = num_bytes; group.bench_with_input( - BenchmarkId::new("read_num_bytes_u64", format!("num_bytes_{}", x)), + BenchmarkId::new("read_num_bytes_u64", format!("num_bytes_{x}")), &x, |b, &x| { b.iter(|| read_num_bytes_u64(black_box(x), black_box(buffer.as_slice()))); @@ -152,7 +152,7 @@ fn criterion_benchmark(c: &mut Criterion) { for num_bytes in (1..=4).step_by(3) { let x = num_bytes; group.bench_with_input( - BenchmarkId::new("read_num_bytes_u32", format!("num_bytes_{}", x)), + BenchmarkId::new("read_num_bytes_u32", format!("num_bytes_{x}")), &x, |b, &x| { b.iter(|| read_num_bytes_u32(black_box(x), black_box(buffer.as_slice()))); @@ -164,7 +164,7 @@ fn criterion_benchmark(c: &mut Criterion) { for length in (0..=64).step_by(32) { let x = length; group.bench_with_input( - BenchmarkId::new("trailing_bits", format!("num_bits_{}", x)), + BenchmarkId::new("trailing_bits", format!("num_bits_{x}")), &x, |b, &x| { b.iter(|| trailing_bits(black_box(1234567890), black_box(x))); diff --git a/native/core/src/common/bit.rs b/native/core/src/common/bit.rs index 17bbb29989..354fc81621 100644 --- a/native/core/src/common/bit.rs +++ b/native/core/src/common/bit.rs @@ -954,8 +954,7 @@ impl BitReader { shift += 7; debug_assert!( shift <= MAX_VLQ_BYTE_LEN * 7, - "Num of bytes exceed MAX_VLQ_BYTE_LEN ({})", - MAX_VLQ_BYTE_LEN + "Num of bytes exceed MAX_VLQ_BYTE_LEN ({MAX_VLQ_BYTE_LEN})" ); if likely(byte & 0x80 == 0) { return Some(v); @@ -1326,8 +1325,7 @@ mod tests { (0..total).for_each(|i| { assert!( writer.put_value(values[i], num_bits), - "[{}]: put_value() failed", - i + "[{i}]: put_value() failed" ); }); @@ -1479,8 +1477,7 @@ mod tests { for i in 0..batch.len() { assert_eq!( batch[i], expected_values[i], - "num_bits = {}, index = {}", - num_bits, i + "num_bits = {num_bits}, index = {i}" ); } } @@ -1511,8 +1508,7 @@ mod tests { for i in 0..batch.len() { assert_eq!( batch[i], values[i], - "num_bits = {}, index = {}", - num_bits, i + "num_bits = {num_bits}, index = {i}" ); } } @@ -1554,14 +1550,12 @@ mod tests { if i % 2 == 0 { assert!( writer.put_value(values[j] as u64, num_bits), - "[{}]: put_value() failed", - i + "[{i}]: put_value() failed" ); } else { assert!( writer.put_aligned::(aligned_values[j], aligned_value_byte_width), - "[{}]: put_aligned() failed", - i + "[{i}]: put_aligned() failed" ); } } @@ -1599,8 +1593,7 @@ mod tests { (0..total).for_each(|i| { assert!( writer.put_vlq_int(values[i] as u64), - "[{}]; put_vlq_int() failed", - i + "[{i}]; put_vlq_int() failed" ); }); @@ -1625,8 +1618,7 @@ mod tests { (0..total).for_each(|i| { assert!( writer.put_zigzag_vlq_int(values[i] as i64), - "[{}]; put_zigzag_vlq_int() failed", - i + "[{i}]; put_zigzag_vlq_int() failed" ); }); diff --git a/native/core/src/common/buffer.rs b/native/core/src/common/buffer.rs index 291082d104..b78c0db76c 100644 --- a/native/core/src/common/buffer.rs +++ b/native/core/src/common/buffer.rs @@ -75,14 +75,12 @@ impl CometBuffer { assert_eq!( capacity % ALIGNMENT, 0, - "input buffer is not aligned to {} bytes", - ALIGNMENT + "input buffer is not aligned to {ALIGNMENT} bytes" ); Self { data: NonNull::new(ptr as *mut u8).unwrap_or_else(|| { panic!( - "cannot create CometBuffer from (ptr: {:?}, len: {}, capacity: {}", - ptr, len, capacity + "cannot create CometBuffer from (ptr: {ptr:?}, len: {len}, capacity: {capacity}" ) }), len, diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs index 542649262d..b3241477b8 100644 --- a/native/core/src/errors.rs +++ b/native/core/src/errors.rs @@ -439,7 +439,7 @@ enum StacktraceError { fn to_stacktrace_string(msg: String, backtrace_string: String) -> Result { let mut res = String::new(); - write!(&mut res, "{}", msg).map_err(|error| StacktraceError::Message(error.to_string()))?; + write!(&mut res, "{msg}").map_err(|error| StacktraceError::Message(error.to_string()))?; // Use multi-line mode and named capture groups to identify the following stacktrace fields: // - dc = declaredClass @@ -547,9 +547,9 @@ mod tests { .option("-Xcheck:jni") .option(class_path.as_str()) .build() - .unwrap_or_else(|e| panic!("{:#?}", e)); + .unwrap_or_else(|e| panic!("{e:#?}")); - let jvm = JavaVM::new(jvm_args).unwrap_or_else(|e| panic!("{:#?}", e)); + let jvm = JavaVM::new(jvm_args).unwrap_or_else(|e| panic!("{e:#?}")); #[allow(static_mut_refs)] unsafe { @@ -769,7 +769,7 @@ mod tests { .into(); let output = env - .new_string(format!("Hello, {}!", input)) + .new_string(format!("Hello, {input}!")) .expect("Couldn't create java string!"); Ok(output.into_raw()) @@ -869,7 +869,7 @@ mod tests { .unwrap(); let message_string = message.into(); let msg_rust: String = env.get_string(&message_string).unwrap().into(); - println!("{}", msg_rust); + println!("{msg_rust}"); // Since panics result in multi-line messages which include the backtrace, just use the // first line. assert_starts_with!(msg_rust, expected_message); diff --git a/native/core/src/execution/memory_pools/config.rs b/native/core/src/execution/memory_pools/config.rs index cd93ea046a..f9a0e23f4f 100644 --- a/native/core/src/execution/memory_pools/config.rs +++ b/native/core/src/execution/memory_pools/config.rs @@ -71,8 +71,7 @@ pub(crate) fn parse_memory_pool_config( } _ => { return Err(CometError::Config(format!( - "Unsupported memory pool type for off-heap mode: {}", - memory_pool_type + "Unsupported memory pool type for off-heap mode: {memory_pool_type}" ))) } } @@ -95,8 +94,7 @@ pub(crate) fn parse_memory_pool_config( "unbounded" => MemoryPoolConfig::new(MemoryPoolType::Unbounded, 0), _ => { return Err(CometError::Config(format!( - "Unsupported memory pool type for on-heap mode: {}", - memory_pool_type + "Unsupported memory pool type for on-heap mode: {memory_pool_type}" ))) } } diff --git a/native/core/src/execution/memory_pools/fair_pool.rs b/native/core/src/execution/memory_pools/fair_pool.rs index bf6133c6d6..a4ec83d829 100644 --- a/native/core/src/execution/memory_pools/fair_pool.rs +++ b/native/core/src/execution/memory_pools/fair_pool.rs @@ -120,7 +120,7 @@ impl MemoryPool for CometFairMemoryPool { panic!("Failed to release {subtractive} bytes where only {size} bytes reserved") } self.release(subtractive) - .unwrap_or_else(|_| panic!("Failed to release {} bytes", subtractive)); + .unwrap_or_else(|_| panic!("Failed to release {subtractive} bytes")); state.used = state.used.checked_sub(subtractive).unwrap(); } } diff --git a/native/core/src/execution/memory_pools/unified_pool.rs b/native/core/src/execution/memory_pools/unified_pool.rs index 4c6790bdfe..8712200d6d 100644 --- a/native/core/src/execution/memory_pools/unified_pool.rs +++ b/native/core/src/execution/memory_pools/unified_pool.rs @@ -80,7 +80,7 @@ impl Drop for CometMemoryPool { fn drop(&mut self) { let used = self.used.load(Relaxed); if used != 0 { - log::warn!("CometMemoryPool dropped with {} bytes still reserved", used); + log::warn!("CometMemoryPool dropped with {used} bytes still reserved"); } } } @@ -95,7 +95,7 @@ impl MemoryPool for CometMemoryPool { fn shrink(&self, _: &MemoryReservation, size: usize) { self.release(size) - .unwrap_or_else(|_| panic!("Failed to release {} bytes", size)); + .unwrap_or_else(|_| panic!("Failed to release {size} bytes")); self.used.fetch_sub(size, Relaxed); } diff --git a/native/core/src/execution/operators/expand.rs b/native/core/src/execution/operators/expand.rs index a342993564..19ca204592 100644 --- a/native/core/src/execution/operators/expand.rs +++ b/native/core/src/execution/operators/expand.rs @@ -77,7 +77,7 @@ impl DisplayAs for ExpandExec { for projection in &self.projections { write!(f, "[")?; for expr in projection { - write!(f, "{}, ", expr)?; + write!(f, "{expr}, ")?; } write!(f, "], ")?; } diff --git a/native/core/src/execution/operators/scan.rs b/native/core/src/execution/operators/scan.rs index c94c2be37b..a842efaa30 100644 --- a/native/core/src/execution/operators/scan.rs +++ b/native/core/src/execution/operators/scan.rs @@ -210,8 +210,7 @@ impl ScanExec { if iter.is_null() { return Err(CometError::from(ExecutionError::GeneralError(format!( - "Null batch iterator object. Plan id: {}", - exec_context_id + "Null batch iterator object. Plan id: {exec_context_id}" )))); } @@ -303,14 +302,14 @@ fn scan_schema(input_batch: &InputBatch, data_types: &[DataType]) -> SchemaRef { .map(|(idx, c)| { let datatype = ScanExec::unpack_dictionary_type(c.data_type()); // We don't use the field name. Put a placeholder. - Field::new(format!("col_{}", idx), datatype, true) + Field::new(format!("col_{idx}"), datatype, true) }) .collect::>() } _ => data_types .iter() .enumerate() - .map(|(idx, dt)| Field::new(format!("col_{}", idx), dt.clone(), true)) + .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), true)) .collect(), }; diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0ce1ea7a96..a45036be45 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -323,8 +323,7 @@ impl PhysicalPlanner { let idx = bound.index as usize; if idx >= input_schema.fields().len() { return Err(GeneralError(format!( - "Column index {} is out of bound. Schema: {}", - idx, input_schema + "Column index {idx} is out of bound. Schema: {input_schema}" ))); } let field = input_schema.field(idx); @@ -382,7 +381,7 @@ impl PhysicalPlanner { DataType::List(f) => DataType::List(f).try_into()?, DataType::Null => ScalarValue::Null, dt => { - return Err(GeneralError(format!("{:?} is not supported in Comet", dt))) + return Err(GeneralError(format!("{dt:?} is not supported in Comet"))) } } } else { @@ -395,8 +394,7 @@ impl PhysicalPlanner { DataType::Date32 => ScalarValue::Date32(Some(*value)), dt => { return Err(GeneralError(format!( - "Expected either 'Int32' or 'Date32' for IntVal, but found {:?}", - dt + "Expected either 'Int32' or 'Date32' for IntVal, but found {dt:?}" ))) } }, @@ -410,8 +408,7 @@ impl PhysicalPlanner { } dt => { return Err(GeneralError(format!( - "Expected either 'Int64' or 'Timestamp' for LongVal, but found {:?}", - dt + "Expected either 'Int64' or 'Timestamp' for LongVal, but found {dt:?}" ))) } }, @@ -423,8 +420,7 @@ impl PhysicalPlanner { let big_integer = BigInt::from_signed_bytes_be(value); let integer = big_integer.to_i128().ok_or_else(|| { GeneralError(format!( - "Cannot parse {:?} as i128 for Decimal literal", - big_integer + "Cannot parse {big_integer:?} as i128 for Decimal literal" )) })?; @@ -434,8 +430,7 @@ impl PhysicalPlanner { } dt => { return Err(GeneralError(format!( - "Decimal literal's data type should be Decimal128 but got {:?}", - dt + "Decimal literal's data type should be Decimal128 but got {dt:?}" ))) } } @@ -808,7 +803,7 @@ impl PhysicalPlanner { let child = self.create_expr(expr.child.as_ref().unwrap(), input_schema)?; Ok(Arc::new(RandExpr::new(child, self.partition))) } - expr => Err(GeneralError(format!("Not implemented: {:?}", expr))), + expr => Err(GeneralError(format!("Not implemented: {expr:?}"))), } } @@ -834,7 +829,7 @@ impl PhysicalPlanner { options, }) } - expr => Err(GeneralError(format!("{:?} isn't a SortOrder", expr))), + expr => Err(GeneralError(format!("{expr:?} isn't a SortOrder"))), } } @@ -974,7 +969,7 @@ impl PhysicalPlanner { .enumerate() .map(|(idx, expr)| { self.create_expr(expr, child.schema()) - .map(|r| (r, format!("col_{}", idx))) + .map(|r| (r, format!("col_{idx}"))) }) .collect(); let projection = Arc::new(ProjectionExec::try_new( @@ -1019,7 +1014,7 @@ impl PhysicalPlanner { .enumerate() .map(|(idx, expr)| { self.create_expr(expr, child.schema()) - .map(|r| (r, format!("col_{}", idx))) + .map(|r| (r, format!("col_{idx}"))) }) .collect(); let group_by = PhysicalGroupBy::new_single(group_exprs?); @@ -1055,7 +1050,7 @@ impl PhysicalPlanner { .enumerate() .map(|(idx, expr)| { self.create_expr(expr, aggregate.schema()) - .map(|r| (r, format!("col_{}", idx))) + .map(|r| (r, format!("col_{idx}"))) }) .collect(); @@ -1338,7 +1333,7 @@ impl PhysicalPlanner { let fields: Vec = datatypes .iter() .enumerate() - .map(|(idx, dt)| Field::new(format!("col_{}", idx), dt.clone(), true)) + .map(|(idx, dt)| Field::new(format!("col_{idx}"), dt.clone(), true)) .collect(); let schema = Arc::new(Schema::new(fields)); @@ -1585,8 +1580,7 @@ impl PhysicalPlanner { Ok(JoinType::RightAnti) => DFJoinType::RightAnti, Err(_) => { return Err(GeneralError(format!( - "Unsupported join type: {:?}", - join_type + "Unsupported join type: {join_type:?}" ))); } }; @@ -1914,8 +1908,7 @@ impl PhysicalPlanner { ) } stats_type => Err(GeneralError(format!( - "Unknown StatisticsType {:?} for Variance", - stats_type + "Unknown StatisticsType {stats_type:?} for Variance" ))), } } @@ -1944,8 +1937,7 @@ impl PhysicalPlanner { Self::create_aggr_func_expr("variance_pop", schema, vec![child], func) } stats_type => Err(GeneralError(format!( - "Unknown StatisticsType {:?} for Variance", - stats_type + "Unknown StatisticsType {stats_type:?} for Variance" ))), } } @@ -1974,8 +1966,7 @@ impl PhysicalPlanner { Self::create_aggr_func_expr("stddev_pop", schema, vec![child], func) } stats_type => Err(GeneralError(format!( - "Unknown StatisticsType {:?} for stddev", - stats_type + "Unknown StatisticsType {stats_type:?} for stddev" ))), } } diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index e32ab8c51e..33e6989d4c 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -102,7 +102,7 @@ impl ShuffleBlockWriter { arrow_writer.write(batch)?; arrow_writer.finish()?; wtr.finish().map_err(|e| { - DataFusionError::Execution(format!("lz4 compression error: {}", e)) + DataFusionError::Execution(format!("lz4 compression error: {e}")) })? } @@ -121,7 +121,7 @@ impl ShuffleBlockWriter { arrow_writer.write(batch)?; arrow_writer.finish()?; wtr.into_inner().map_err(|e| { - DataFusionError::Execution(format!("snappy compression error: {}", e)) + DataFusionError::Execution(format!("snappy compression error: {e}")) })? } }; diff --git a/native/core/src/execution/shuffle/list.rs b/native/core/src/execution/shuffle/list.rs index 0cd3f396ce..c2505a80f4 100644 --- a/native/core/src/execution/shuffle/list.rs +++ b/native/core/src/execution/shuffle/list.rs @@ -50,11 +50,11 @@ impl SparkUnsafeArray { let num_elements = i64::from_le_bytes(slice.try_into().unwrap()); if num_elements < 0 { - panic!("Negative number of elements: {}", num_elements); + panic!("Negative number of elements: {num_elements}"); } if num_elements > i32::MAX as i64 { - panic!("Number of elements should <= i32::MAX: {}", num_elements); + panic!("Number of elements should <= i32::MAX: {num_elements}"); } Self { @@ -333,8 +333,7 @@ pub fn append_list_element( } _ => { return Err(CometError::Internal(format!( - "Unsupported data type in list element: {:?}", - element_dt + "Unsupported data type in list element: {element_dt:?}" ))) } } diff --git a/native/core/src/execution/shuffle/map.rs b/native/core/src/execution/shuffle/map.rs index c07bd95e2b..8bc03feb56 100644 --- a/native/core/src/execution/shuffle/map.rs +++ b/native/core/src/execution/shuffle/map.rs @@ -42,19 +42,18 @@ impl SparkUnsafeMap { let key_array_size = i64::from_le_bytes(slice.try_into().unwrap()); if key_array_size < 0 { - panic!("Negative key size in bytes of map: {}", key_array_size); + panic!("Negative key size in bytes of map: {key_array_size}"); } if key_array_size > i32::MAX as i64 { panic!( - "Number of key size in bytes should <= i32::MAX: {}", - key_array_size + "Number of key size in bytes should <= i32::MAX: {key_array_size}" ); } let value_array_size = size - key_array_size as i32 - 8; if value_array_size < 0 { - panic!("Negative value size in bytes of map: {}", value_array_size); + panic!("Negative value size in bytes of map: {value_array_size}"); } let keys = SparkUnsafeArray::new(addr + 8); diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index c98cc54387..e2f335e1b6 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -3130,7 +3130,7 @@ fn make_builders( Box::new(StructBuilder::new(fields.clone(), field_builders)) } - _ => return Err(CometError::Internal(format!("Unsupported type: {:?}", dt))), + _ => return Err(CometError::Internal(format!("Unsupported type: {dt:?}"))), }; Ok(builder) @@ -3289,7 +3289,7 @@ fn make_batch(arrays: Vec, row_count: usize) -> Result>(); let schema = Arc::new(Schema::new(fields)); let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index eb3083bff8..2287d33a18 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -395,7 +395,7 @@ impl MultiPartitionShuffleRepartitioner { .map(|_| PartitionWriter::try_new(shuffle_block_writer.clone())) .collect::>>()?; - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition)) + let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{partition}]")) .with_can_spill(true) .register(&runtime.memory_pool); @@ -619,8 +619,7 @@ impl MultiPartitionShuffleRepartitioner { // this should be unreachable as long as the validation logic // in the constructor is kept up-to-date return Err(DataFusionError::NotImplemented(format!( - "Unsupported shuffle partitioning scheme {:?}", - other + "Unsupported shuffle partitioning scheme {other:?}" ))); } } @@ -796,7 +795,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { .create(true) .truncate(true) .open(data_file) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; let mut output_data = BufWriter::new(output_data); @@ -834,7 +833,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { let mut write_timer = self.metrics.write_time.timer(); let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + DataFusionError::Execution(format!("shuffle write error: {e:?}")) })?); for offset in offsets { output_index @@ -1014,7 +1013,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { .create(true) .truncate(true) .open(self.output_index_path.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {:?}", e)))?; + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; let data_file_length = self .output_data_writer .writer @@ -1036,7 +1035,7 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { } fn to_df_err(e: Error) -> DataFusionError { - DataFusionError::Execution(format!("shuffle write error: {:?}", e)) + DataFusionError::Execution(format!("shuffle write error: {e:?}")) } /// A helper struct to produce shuffled batches. @@ -1197,7 +1196,7 @@ impl PartitionWriter { .truncate(true) .open(spill_file.path()) .map_err(|e| { - DataFusionError::Execution(format!("Error occurred while spilling {}", e)) + DataFusionError::Execution(format!("Error occurred while spilling {e}")) })?; self.spill_file = Some(SpillFile { temp_file: spill_file, diff --git a/native/core/src/execution/tracing.rs b/native/core/src/execution/tracing.rs index 0555e6b147..01351565f5 100644 --- a/native/core/src/execution/tracing.rs +++ b/native/core/src/execution/tracing.rs @@ -86,7 +86,7 @@ impl Recorder { fn get_thread_id() -> u64 { let thread_id = std::thread::current().id(); - format!("{:?}", thread_id) + format!("{thread_id:?}") .trim_start_matches("ThreadId(") .trim_end_matches(")") .parse() diff --git a/native/core/src/execution/util/spark_bloom_filter.rs b/native/core/src/execution/util/spark_bloom_filter.rs index 79e541c59f..ad77cdb497 100644 --- a/native/core/src/execution/util/spark_bloom_filter.rs +++ b/native/core/src/execution/util/spark_bloom_filter.rs @@ -61,8 +61,7 @@ impl From<&[u8]> for SparkBloomFilter { offset += 4; assert_eq!( version, SPARK_BLOOM_FILTER_VERSION_1, - "Unsupported BloomFilter version: {}, expecting version: {}", - version, SPARK_BLOOM_FILTER_VERSION_1 + "Unsupported BloomFilter version: {version}, expecting version: {SPARK_BLOOM_FILTER_VERSION_1}" ); let num_hash_functions = read_num_be_bytes!(i32, 4, buf[offset..]); offset += 4; diff --git a/native/core/src/jvm_bridge/mod.rs b/native/core/src/jvm_bridge/mod.rs index 00e58b7099..53ba24ed6d 100644 --- a/native/core/src/jvm_bridge/mod.rs +++ b/native/core/src/jvm_bridge/mod.rs @@ -272,8 +272,7 @@ impl JVMClasses<'_> { let java_vm = JAVA_VM.get_unchecked(); java_vm.attach_current_thread().map_err(|e| { CometError::Internal(format!( - "JVMClasses::get_env() failed to attach current thread: {}", - e + "JVMClasses::get_env() failed to attach current thread: {e}" )) }) } @@ -361,8 +360,7 @@ fn get_throwable_message( let cause_class_name = get_throwable_class_name(env, jvm_classes, &cause)?; let cause_message = get_throwable_message(env, jvm_classes, &cause)?; Ok(format!( - "{}\nCaused by: {}: {}", - message_str, cause_class_name, cause_message + "{message_str}\nCaused by: {cause_class_name}: {cause_message}" )) } else { Ok(message_str) diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs index d83ef836a8..e8df170744 100644 --- a/native/core/src/lib.rs +++ b/native/core/src/lib.rs @@ -93,7 +93,7 @@ pub extern "system" fn Java_org_apache_comet_NativeBase_init( JAVA_VM.get_or_init(|| java_vm); let comet_version = env!("CARGO_PKG_VERSION"); - info!("Comet native library version {} initialized", comet_version); + info!("Comet native library version {comet_version} initialized"); Ok(()) }) } diff --git a/native/core/src/parquet/mutable_vector.rs b/native/core/src/parquet/mutable_vector.rs index a84966c13c..6187a5bac8 100644 --- a/native/core/src/parquet/mutable_vector.rs +++ b/native/core/src/parquet/mutable_vector.rs @@ -225,7 +225,7 @@ impl ParquetMutableVector { ArrowDataType::FixedSizeBinary(type_length) => *type_length as usize * 8, ArrowDataType::Decimal128(..) => 128, // Arrow stores decimal with 16 bytes ArrowDataType::Binary | ArrowDataType::Utf8 => 32, // Only count offset size - dt => panic!("Unsupported Arrow data type: {:?}", dt), + dt => panic!("Unsupported Arrow data type: {dt:?}"), } } diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index c1301272ff..b48210de5f 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -63,7 +63,7 @@ pub fn create_store( if scheme != ObjectStoreScheme::AmazonS3 { return Err(object_store::Error::Generic { store: "S3", - source: format!("Scheme of URL is not S3: {}", url).into(), + source: format!("Scheme of URL is not S3: {url}").into(), }); } let path = Path::from_url_path(path)?; @@ -84,7 +84,7 @@ pub fn create_store( }; let s3_configs = extract_s3_config_options(configs, bucket); - debug!("S3 configs for bucket {}: {:?}", bucket, s3_configs); + debug!("S3 configs for bucket {bucket}: {s3_configs:?}"); // When using the default AWS S3 endpoint (no custom endpoint configured), a valid region // is required. If no region is explicitly configured, attempt to auto-resolve it by @@ -94,7 +94,7 @@ pub fn create_store( { let region = get_runtime().block_on(resolve_bucket_region(bucket, &ClientOptions::new()))?; - debug!("resolved region: {:?}", region); + debug!("resolved region: {region:?}"); builder = builder.with_config(AmazonS3ConfigKey::Region, region.to_string()); } @@ -178,16 +178,16 @@ fn normalize_endpoint( } let endpoint = if !endpoint.starts_with("http://") && !endpoint.starts_with("https://") { - format!("https://{}", endpoint) + format!("https://{endpoint}") } else { endpoint.to_string() }; if virtual_hosted_style_request { if endpoint.ends_with("/") { - Some(format!("{}{}", endpoint, bucket)) + Some(format!("{endpoint}{bucket}")) } else { - Some(format!("{}/{}", endpoint, bucket)) + Some(format!("{endpoint}/{bucket}")) } } else { Some(endpoint) // Avoid extra to_string() call since endpoint is already a String @@ -199,9 +199,9 @@ fn get_config<'a>( bucket: &str, property: &str, ) -> Option<&'a String> { - let per_bucket_key = format!("fs.s3a.bucket.{}.{}", bucket, property); + let per_bucket_key = format!("fs.s3a.bucket.{bucket}.{property}"); configs.get(&per_bucket_key).or_else(|| { - let global_key = format!("fs.s3a.{}", property); + let global_key = format!("fs.s3a.{property}"); configs.get(&global_key) }) } @@ -362,8 +362,7 @@ fn build_aws_credential_provider_metadata( _ => Err(object_store::Error::Generic { store: "S3", source: format!( - "Unsupported credential provider: {}", - credential_provider_name + "Unsupported credential provider: {credential_provider_name}" ) .into(), }), @@ -508,7 +507,7 @@ impl CachedAwsCredentialProvider { async fn refresh_credential(&self) -> object_store::Result { let credentials = self.provider.provide_credentials().await.map_err(|e| { - error!("Failed to retrieve credentials: {:?}", e); + error!("Failed to retrieve credentials: {e:?}"); object_store::Error::Generic { store: "S3", source: Box::new(e), @@ -631,7 +630,7 @@ impl CredentialProviderMetadata { CredentialProviderMetadata::Environment => "Environment".to_string(), CredentialProviderMetadata::WebIdentity => "WebIdentity".to_string(), CredentialProviderMetadata::Static { is_valid, .. } => { - format!("Static(valid: {})", is_valid) + format!("Static(valid: {is_valid})") } CredentialProviderMetadata::AssumeRole { role_arn, @@ -776,7 +775,7 @@ mod tests { fn with_bucket_credential_provider(mut self, bucket: &str, provider: &str) -> Self { self.configs.insert( - format!("fs.s3a.bucket.{}.aws.credentials.provider", bucket), + format!("fs.s3a.bucket.{bucket}.aws.credentials.provider"), provider.to_string(), ); self @@ -802,7 +801,7 @@ mod tests { fn with_bucket_access_key(mut self, bucket: &str, key: &str) -> Self { self.configs.insert( - format!("fs.s3a.bucket.{}.access.key", bucket), + format!("fs.s3a.bucket.{bucket}.access.key"), key.to_string(), ); self @@ -810,7 +809,7 @@ mod tests { fn with_bucket_secret_key(mut self, bucket: &str, key: &str) -> Self { self.configs.insert( - format!("fs.s3a.bucket.{}.secret.key", bucket), + format!("fs.s3a.bucket.{bucket}.secret.key"), key.to_string(), ); self @@ -818,7 +817,7 @@ mod tests { fn with_bucket_session_token(mut self, bucket: &str, token: &str) -> Self { self.configs.insert( - format!("fs.s3a.bucket.{}.session.token", bucket), + format!("fs.s3a.bucket.{bucket}.session.token"), token.to_string(), ); self @@ -904,8 +903,7 @@ mod tests { assert_eq!(credential_provider_names, vec![HADOOP_ANONYMOUS]); let aws_credential_provider_names = format!( - "{},{},{}", - HADOOP_ANONYMOUS, AWS_ENVIRONMENT, AWS_ENVIRONMENT_V1 + "{HADOOP_ANONYMOUS},{AWS_ENVIRONMENT},{AWS_ENVIRONMENT_V1}" ); let credential_provider_names = parse_credential_provider_names(&aws_credential_provider_names); @@ -915,8 +913,7 @@ mod tests { ); let aws_credential_provider_names = format!( - " {}, {},, {},", - HADOOP_ANONYMOUS, AWS_ENVIRONMENT, AWS_ENVIRONMENT_V1 + " {HADOOP_ANONYMOUS}, {AWS_ENVIRONMENT},, {AWS_ENVIRONMENT_V1}," ); let credential_provider_names = parse_credential_provider_names(&aws_credential_provider_names); @@ -926,8 +923,7 @@ mod tests { ); let aws_credential_provider_names = format!( - "\n {},\n {},\n , \n {},\n", - HADOOP_ANONYMOUS, AWS_ENVIRONMENT, AWS_ENVIRONMENT_V1 + "\n {HADOOP_ANONYMOUS},\n {AWS_ENVIRONMENT},\n , \n {AWS_ENVIRONMENT_V1},\n" ); let credential_provider_names = parse_credential_provider_names(&aws_credential_provider_names); @@ -984,7 +980,7 @@ mod tests { #[cfg_attr(miri, ignore)] // AWS credential providers call foreign functions async fn test_mixed_anonymous_and_other_providers_error() { let configs = TestConfigBuilder::new() - .with_credential_provider(&format!("{},{}", HADOOP_ANONYMOUS, AWS_ENVIRONMENT)) + .with_credential_provider(&format!("{HADOOP_ANONYMOUS},{AWS_ENVIRONMENT}")) .build(); let result = @@ -1404,8 +1400,7 @@ mod tests { // Test three providers in chain: Environment -> IMDS -> ECS let configs = TestConfigBuilder::new() .with_credential_provider(&format!( - "{},{},{}", - AWS_ENVIRONMENT, AWS_INSTANCE_PROFILE, AWS_CONTAINER_CREDENTIALS + "{AWS_ENVIRONMENT},{AWS_INSTANCE_PROFILE},{AWS_CONTAINER_CREDENTIALS}" )) .build(); @@ -1433,8 +1428,7 @@ mod tests { // Test chaining static credentials -> environment -> web identity let configs = TestConfigBuilder::new() .with_credential_provider(&format!( - "{},{},{}", - HADOOP_SIMPLE, AWS_ENVIRONMENT, AWS_WEB_IDENTITY + "{HADOOP_SIMPLE},{AWS_ENVIRONMENT},{AWS_WEB_IDENTITY}" )) .with_access_key("chain_access_key") .with_secret_key("chain_secret_key") @@ -1536,8 +1530,7 @@ mod tests { .with_assume_role_arn("arn:aws:iam::123456789012:role/chained-role") .with_assume_role_session_name("chained-base-session") .with_assume_role_credentials_provider(&format!( - "{},{},{}", - HADOOP_SIMPLE, AWS_ENVIRONMENT, AWS_INSTANCE_PROFILE + "{HADOOP_SIMPLE},{AWS_ENVIRONMENT},{AWS_INSTANCE_PROFILE}" )) .with_access_key("chained_base_access_key") .with_secret_key("chained_base_secret_key") @@ -1576,14 +1569,12 @@ mod tests { // Test assume role as first provider in a chain, followed by environment and IMDS let configs = TestConfigBuilder::new() .with_credential_provider(&format!( - " {}\n, {}\n", - HADOOP_ASSUMED_ROLE, AWS_INSTANCE_PROFILE + " {HADOOP_ASSUMED_ROLE}\n, {AWS_INSTANCE_PROFILE}\n" )) .with_assume_role_arn("arn:aws:iam::123456789012:role/first-in-chain") .with_assume_role_session_name("first-chain-session") .with_assume_role_credentials_provider(&format!( - " {}\n, {}\n, {}\n", - AWS_WEB_IDENTITY, HADOOP_TEMPORARY, AWS_ENVIRONMENT + " {AWS_WEB_IDENTITY}\n, {HADOOP_TEMPORARY}\n, {AWS_ENVIRONMENT}\n" )) .with_access_key("assume_role_base_key") .with_secret_key("assume_role_base_secret") @@ -1706,7 +1697,7 @@ mod tests { async fn test_invalid_static_credential_provider_should_not_prevent_other_providers_from_working( ) { let configs = TestConfigBuilder::new() - .with_credential_provider(&format!("{},{}", HADOOP_TEMPORARY, HADOOP_SIMPLE)) + .with_credential_provider(&format!("{HADOOP_TEMPORARY},{HADOOP_SIMPLE}")) .with_access_key("test_access_key") .with_secret_key("test_secret_key") .build(); @@ -1757,8 +1748,8 @@ mod tests { { let cnt = self.counter.fetch_add(1, Ordering::SeqCst); let cred = Credentials::builder() - .access_key_id(format!("test_access_key_{}", cnt)) - .secret_access_key(format!("test_secret_key_{}", cnt)) + .access_key_id(format!("test_access_key_{cnt}")) + .secret_access_key(format!("test_secret_key_{cnt}")) .expiry(SystemTime::now() + Duration::from_secs(60)) .provider_name("mock_provider") .build(); @@ -1781,8 +1772,8 @@ mod tests { ); for k in 0..3 { let credential = cached_provider.get_credential().await.unwrap(); - assert_eq!(credential.key_id, format!("test_access_key_{}", k)); - assert_eq!(credential.secret_key, format!("test_secret_key_{}", k)); + assert_eq!(credential.key_id, format!("test_access_key_{k}")); + assert_eq!(credential.secret_key, format!("test_secret_key_{k}")); } } diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs index 43b9d5ada9..9988a89f1f 100644 --- a/native/core/src/parquet/read/column.rs +++ b/native/core/src/parquet/read/column.rs @@ -293,7 +293,7 @@ impl ColumnReader { UInt64DecimalColumnReader, ArrowDataType::Decimal128(20u8, 0i8) ), - _ => panic!("Unsupported INT64 annotation: {:?}", lt), + _ => panic!("Unsupported INT64 annotation: {lt:?}"), }, LogicalType::Decimal { scale: _, @@ -351,7 +351,7 @@ impl ColumnReader { } } } - lt => panic!("Unsupported logical type for INT64: {:?}", lt), + lt => panic!("Unsupported logical type for INT64: {lt:?}"), } } else { match promotion_info.physical_type { @@ -385,7 +385,7 @@ impl ColumnReader { // We support type promotion from float to double PhysicalType::FLOAT => typed_reader!(FloatColumnReader, Float32), PhysicalType::DOUBLE => typed_reader!(FloatToDoubleColumnReader, Float64), - t => panic!("Unsupported read physical type: {} for FLOAT", t), + t => panic!("Unsupported read physical type: {t} for FLOAT"), }, PhysicalType::DOUBLE => typed_reader!(DoubleColumnReader, Float64), @@ -396,7 +396,7 @@ impl ColumnReader { // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md // "enum type should interpret ENUM annotated field as a UTF-8" LogicalType::Enum => typed_reader!(StringColumnReader, Utf8), - lt => panic!("Unsupported logical type for BYTE_ARRAY: {:?}", lt), + lt => panic!("Unsupported logical type for BYTE_ARRAY: {lt:?}"), } } else { typed_reader!(ByteArrayColumnReader, Binary) @@ -438,7 +438,7 @@ impl ColumnReader { ArrowDataType::FixedSizeBinary(type_length) ) } - t => panic!("Unsupported logical type for FIXED_LEN_BYTE_ARRAY: {:?}", t), + t => panic!("Unsupported logical type for FIXED_LEN_BYTE_ARRAY: {t:?}"), } } else { let type_length = desc.type_length(); @@ -775,7 +775,7 @@ impl TypedColumnReader { } if encoding != Encoding::PLAIN_DICTIONARY { - panic!("Invalid encoding type for Parquet dictionary: {}", encoding); + panic!("Invalid encoding type for Parquet dictionary: {encoding}"); } if self.vector.dictionary.is_some() { @@ -973,8 +973,7 @@ impl TypedColumnReader { fn check_const(&mut self, method_name: &str) { assert!( self.value_decoder.is_none(), - "{} cannot be called after set_page_v1/set_page_v2!", - method_name + "{method_name} cannot be called after set_page_v1/set_page_v2!" ); assert!(!self.is_const, "can only set constant once!"); self.is_const = true; diff --git a/native/core/src/parquet/read/values.rs b/native/core/src/parquet/read/values.rs index e28d695ecd..84fe2991e4 100644 --- a/native/core/src/parquet/read/values.rs +++ b/native/core/src/parquet/read/values.rs @@ -44,7 +44,7 @@ pub fn get_decoder( } // This is for dictionary indices Encoding::RLE_DICTIONARY => Box::new(DictDecoder::new(value_data)), - _ => panic!("Unsupported encoding: {}", encoding), + _ => panic!("Unsupported encoding: {encoding}"), }; decoder } @@ -325,11 +325,10 @@ impl PlainDecoding for Int64TimestampMillisType { // efficient if unlikely(v < JULIAN_GREGORIAN_SWITCH_OFF_TS) { panic!( - "Encountered timestamp value {}, which is before 1582-10-15 (counting \ + "Encountered timestamp value {v}, which is before 1582-10-15 (counting \ backwards from Unix epoch date 1970-01-01), and could be ambigous \ depending on whether a legacy Julian/Gregorian hybrid calendar is used, \ - or a Proleptic Gregorian calendar is used.", - v + or a Proleptic Gregorian calendar is used." ); } @@ -842,11 +841,10 @@ impl PlainDecoding for Int96TimestampMicrosType { if unlikely(micros < JULIAN_GREGORIAN_SWITCH_OFF_TS) { panic!( - "Encountered timestamp value {}, which is before 1582-10-15 (counting \ + "Encountered timestamp value {micros}, which is before 1582-10-15 (counting \ backwards from Unix epoch date 1970-01-01), and could be ambigous \ depending on whether a legacy Julian/Gregorian hybrid calendar is used, \ - or a Proleptic Gregorian calendar is used.", - micros + or a Proleptic Gregorian calendar is used." ); } diff --git a/native/core/src/parquet/util/jni.rs b/native/core/src/parquet/util/jni.rs index 6be7a7269b..d7d86a5e28 100644 --- a/native/core/src/parquet/util/jni.rs +++ b/native/core/src/parquet/util/jni.rs @@ -94,7 +94,7 @@ pub fn convert_encoding(ordinal: jint) -> Encoding { 6 => Encoding::DELTA_LENGTH_BYTE_ARRAY, 7 => Encoding::DELTA_BYTE_ARRAY, 8 => Encoding::RLE_DICTIONARY, - _ => panic!("Invalid Java Encoding ordinal: {}", ordinal), + _ => panic!("Invalid Java Encoding ordinal: {ordinal}"), } } @@ -153,7 +153,7 @@ fn convert_physical_type(id: jint) -> PhysicalType { 5 => PhysicalType::DOUBLE, 6 => PhysicalType::BYTE_ARRAY, 7 => PhysicalType::FIXED_LEN_BYTE_ARRAY, - _ => panic!("Invalid id for Parquet physical type: {} ", id), + _ => panic!("Invalid id for Parquet physical type: {id} "), } } @@ -180,7 +180,7 @@ fn convert_logical_type( }, 5 => LogicalType::Enum, 6 => LogicalType::Uuid, - _ => panic!("Invalid id for Parquet logical type: {}", id), + _ => panic!("Invalid id for Parquet logical type: {id}"), } } @@ -189,7 +189,7 @@ fn convert_time_unit(time_unit: jint) -> TimeUnit { 0 => TimeUnit::MILLIS(MilliSeconds::new()), 1 => TimeUnit::MICROS(MicroSeconds::new()), 2 => TimeUnit::NANOS(NanoSeconds::new()), - _ => panic!("Invalid time unit id for Parquet: {}", time_unit), + _ => panic!("Invalid time unit id for Parquet: {time_unit}"), } } diff --git a/native/core/src/parquet/util/test_common/page_util.rs b/native/core/src/parquet/util/test_common/page_util.rs index c87505a481..e4e3e30c45 100644 --- a/native/core/src/parquet/util/test_common/page_util.rs +++ b/native/core/src/parquet/util/test_common/page_util.rs @@ -288,7 +288,7 @@ pub fn make_pages( Encoding::PLAIN => { pb.add_values::(encoding, &values[value_range]); } - enc => panic!("Unexpected encoding {}", enc), + enc => panic!("Unexpected encoding {enc}"), } let data_page = pb.consume(); diff --git a/native/spark-expr/src/array_funcs/array_insert.rs b/native/spark-expr/src/array_funcs/array_insert.rs index 93946e8d25..eb96fec12f 100644 --- a/native/spark-expr/src/array_funcs/array_insert.rs +++ b/native/spark-expr/src/array_funcs/array_insert.rs @@ -86,8 +86,7 @@ impl ArrayInsert { DataType::List(field) => Ok(DataType::List(Arc::clone(field))), DataType::LargeList(field) => Ok(DataType::LargeList(Arc::clone(field))), data_type => Err(DataFusionError::Internal(format!( - "Unexpected src array type in ArrayInsert: {:?}", - data_type + "Unexpected src array type in ArrayInsert: {data_type:?}" ))), } } @@ -248,8 +247,7 @@ fn array_insert( let new_array_len = std::cmp::max(end - start + 1, corrected_pos); if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { return Err(DataFusionError::Internal(format!( - "Max array length in Spark is {:?}, but got {:?}", - MAX_ROUNDED_ARRAY_LENGTH, new_array_len + "Max array length in Spark is {MAX_ROUNDED_ARRAY_LENGTH:?}, but got {new_array_len:?}" ))); } @@ -275,8 +273,7 @@ fn array_insert( let new_array_len = (-pos + base_offset).as_usize(); if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { return Err(DataFusionError::Internal(format!( - "Max array length in Spark is {:?}, but got {:?}", - MAX_ROUNDED_ARRAY_LENGTH, new_array_len + "Max array length in Spark is {MAX_ROUNDED_ARRAY_LENGTH:?}, but got {new_array_len:?}" ))); } mutable_values.extend(1, row_index, row_index + 1); diff --git a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs index 6571a80817..e63fe1f519 100644 --- a/native/spark-expr/src/array_funcs/get_array_struct_fields.rs +++ b/native/spark-expr/src/array_funcs/get_array_struct_fields.rs @@ -59,8 +59,7 @@ impl GetArrayStructFields { match self.child.data_type(input_schema)? { DataType::List(field) | DataType::LargeList(field) => Ok(field), data_type => Err(DataFusionError::Internal(format!( - "Unexpected data type in GetArrayStructFields: {:?}", - data_type + "Unexpected data type in GetArrayStructFields: {data_type:?}" ))), } } @@ -69,8 +68,7 @@ impl GetArrayStructFields { match self.list_field(input_schema)?.data_type() { DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])), data_type => Err(DataFusionError::Internal(format!( - "Unexpected data type in GetArrayStructFields: {:?}", - data_type + "Unexpected data type in GetArrayStructFields: {data_type:?}" ))), } } @@ -87,8 +85,7 @@ impl PhysicalExpr for GetArrayStructFields { DataType::List(_) => Ok(DataType::List(struct_field)), DataType::LargeList(_) => Ok(DataType::LargeList(struct_field)), data_type => Err(DataFusionError::Internal(format!( - "Unexpected data type in GetArrayStructFields: {:?}", - data_type + "Unexpected data type in GetArrayStructFields: {data_type:?}" ))), } } @@ -113,8 +110,7 @@ impl PhysicalExpr for GetArrayStructFields { get_array_struct_fields(list_array, self.ordinal) } data_type => Err(DataFusionError::Internal(format!( - "Unexpected child type for ListExtract: {:?}", - data_type + "Unexpected child type for ListExtract: {data_type:?}" ))), } } diff --git a/native/spark-expr/src/array_funcs/list_extract.rs b/native/spark-expr/src/array_funcs/list_extract.rs index 7336fb65fc..f015d4e9d7 100644 --- a/native/spark-expr/src/array_funcs/list_extract.rs +++ b/native/spark-expr/src/array_funcs/list_extract.rs @@ -80,8 +80,7 @@ impl ListExtract { match self.child.data_type(input_schema)? { DataType::List(field) | DataType::LargeList(field) => Ok(field), data_type => Err(DataFusionError::Internal(format!( - "Unexpected data type in ListExtract: {:?}", - data_type + "Unexpected data type in ListExtract: {data_type:?}" ))), } } @@ -121,8 +120,7 @@ impl PhysicalExpr for ListExtract { } ColumnarValue::Scalar(scalar) => Ok(scalar), v => Err(DataFusionError::Execution(format!( - "Expected scalar default value for ListExtract, got {:?}", - v + "Expected scalar default value for ListExtract, got {v:?}" ))), }) }) @@ -161,8 +159,7 @@ impl PhysicalExpr for ListExtract { ) } data_type => Err(DataFusionError::Internal(format!( - "Unexpected child type for ListExtract: {:?}", - data_type + "Unexpected child type for ListExtract: {data_type:?}" ))), } } diff --git a/native/spark-expr/src/datetime_funcs/date_arithmetic.rs b/native/spark-expr/src/datetime_funcs/date_arithmetic.rs index f3b764593a..4b4db2eb5c 100644 --- a/native/spark-expr/src/datetime_funcs/date_arithmetic.rs +++ b/native/spark-expr/src/datetime_funcs/date_arithmetic.rs @@ -79,8 +79,7 @@ fn spark_date_arithmetic( } _ => { return Err(DataFusionError::Internal(format!( - "Unsupported data types {:?} for date arithmetic.", - args, + "Unsupported data types {args:?} for date arithmetic.", ))) } } @@ -88,8 +87,7 @@ fn spark_date_arithmetic( datum::apply(start, &interval_cv, op) } _ => Err(DataFusionError::Internal(format!( - "Unsupported data types {:?} for date arithmetic.", - args, + "Unsupported data types {args:?} for date arithmetic.", ))), } } diff --git a/native/spark-expr/src/kernels/strings.rs b/native/spark-expr/src/kernels/strings.rs index 00da382396..59029a3862 100644 --- a/native/spark-expr/src/kernels/strings.rs +++ b/native/spark-expr/src/kernels/strings.rs @@ -45,8 +45,7 @@ pub fn string_space(length: &dyn Array) -> Result { Ok(Arc::new(result)) } dt => panic!( - "Unsupported input type for function 'string_space': {:?}", - dt + "Unsupported input type for function 'string_space': {dt:?}" ), } } @@ -82,7 +81,7 @@ pub fn substring(array: &dyn Array, start: i64, length: u64) -> Result panic!("Unsupported input type for function 'substring': {:?}", dt), + dt => panic!("Unsupported input type for function 'substring': {dt:?}"), } } diff --git a/native/spark-expr/src/kernels/temporal.rs b/native/spark-expr/src/kernels/temporal.rs index a6d2a87d1c..09e2c905c7 100644 --- a/native/spark-expr/src/kernels/temporal.rs +++ b/native/spark-expr/src/kernels/temporal.rs @@ -307,8 +307,7 @@ where |dt| as_days_from_unix_epoch(trunc_date_to_week(dt)), )), _ => Err(SparkError::Internal(format!( - "Unsupported format: {:?} for function 'date_trunc'", - format + "Unsupported format: {format:?} for function 'date_trunc'" ))), }, dt => return_compute_error_with!( @@ -399,8 +398,7 @@ pub(crate) fn date_trunc_array_fmt_dyn( ) .map(|a| Arc::new(a) as ArrayRef), (dt, fmt) => Err(SparkError::Internal(format!( - "Unsupported datatype: {:}, format: {:?} for function 'date_trunc'", - dt, fmt + "Unsupported datatype: {dt:}, format: {fmt:?} for function 'date_trunc'" ))), } } @@ -588,8 +586,7 @@ where }) } _ => Err(SparkError::Internal(format!( - "Unsupported format: {:?} for function 'timestamp_trunc'", - format + "Unsupported format: {format:?} for function 'timestamp_trunc'" ))), } } @@ -668,8 +665,7 @@ pub(crate) fn timestamp_trunc_array_fmt_dyn( ) } (dt, fmt) => Err(SparkError::Internal(format!( - "Unsupported datatype: {:}, format: {:?} for function 'timestamp_trunc'", - dt, fmt + "Unsupported datatype: {dt:}, format: {fmt:?} for function 'timestamp_trunc'" ))), } } diff --git a/native/spark-expr/src/math_funcs/ceil.rs b/native/spark-expr/src/math_funcs/ceil.rs index 2d6b84ab32..e797a1c865 100644 --- a/native/spark-expr/src/math_funcs/ceil.rs +++ b/native/spark-expr/src/math_funcs/ceil.rs @@ -51,8 +51,7 @@ pub fn spark_ceil( make_decimal_array(array, precision, scale, &f) } other => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function ceil", - other, + "Unsupported data type {other:?} for function ceil", ))), }, ColumnarValue::Scalar(a) => match a { diff --git a/native/spark-expr/src/math_funcs/floor.rs b/native/spark-expr/src/math_funcs/floor.rs index 5547d78a27..8e74294280 100644 --- a/native/spark-expr/src/math_funcs/floor.rs +++ b/native/spark-expr/src/math_funcs/floor.rs @@ -51,8 +51,7 @@ pub fn spark_floor( make_decimal_array(array, precision, scale, &f) } other => Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function floor", - other, + "Unsupported data type {other:?} for function floor", ))), }, ColumnarValue::Scalar(a) => match a { diff --git a/native/spark-expr/src/math_funcs/hex.rs b/native/spark-expr/src/math_funcs/hex.rs index 52b7d8c08f..4837e0946d 100644 --- a/native/spark-expr/src/math_funcs/hex.rs +++ b/native/spark-expr/src/math_funcs/hex.rs @@ -31,7 +31,7 @@ use datafusion::logical_expr::ColumnarValue; use std::fmt::Write; fn hex_int64(num: i64) -> String { - format!("{:X}", num) + format!("{num:X}") } #[inline(always)] diff --git a/native/spark-expr/src/math_funcs/internal/checkoverflow.rs b/native/spark-expr/src/math_funcs/internal/checkoverflow.rs index a584b4e9c7..2276dba3d9 100644 --- a/native/spark-expr/src/math_funcs/internal/checkoverflow.rs +++ b/native/spark-expr/src/math_funcs/internal/checkoverflow.rs @@ -105,8 +105,7 @@ impl PhysicalExpr for CheckOverflow { DataType::Decimal128(p, s) => (p, s), dt => { return Err(DataFusionError::Execution(format!( - "CheckOverflow expects only Decimal128, but got {:?}", - dt + "CheckOverflow expects only Decimal128, but got {dt:?}" ))) } }; @@ -147,8 +146,7 @@ impl PhysicalExpr for CheckOverflow { ))) } v => Err(DataFusionError::Execution(format!( - "CheckOverflow's child expression should be decimal array, but found {:?}", - v + "CheckOverflow's child expression should be decimal array, but found {v:?}" ))), } } diff --git a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs index c043713065..7f564aa0df 100644 --- a/native/spark-expr/src/math_funcs/internal/normalize_nan.rs +++ b/native/spark-expr/src/math_funcs/internal/normalize_nan.rs @@ -87,7 +87,7 @@ impl PhysicalExpr for NormalizeNaNAndZero { let new_array = Float64Array::from(v); Ok(ColumnarValue::Array(Arc::new(new_array))) } - dt => panic!("Unexpected data type {:?}", dt), + dt => panic!("Unexpected data type {dt:?}"), } } diff --git a/native/spark-expr/src/nondetermenistic_funcs/rand.rs b/native/spark-expr/src/nondetermenistic_funcs/rand.rs index 903661f4ff..d82c2cd92e 100644 --- a/native/spark-expr/src/nondetermenistic_funcs/rand.rs +++ b/native/spark-expr/src/nondetermenistic_funcs/rand.rs @@ -163,8 +163,7 @@ impl PhysicalExpr for RandExpr { match self.seed.evaluate(batch)? { ColumnarValue::Scalar(seed) => self.evaluate_batch(seed, batch.num_rows()), ColumnarValue::Array(_arr) => Err(DataFusionError::NotImplemented(format!( - "Only literal seeds are supported for {}", - self + "Only literal seeds are supported for {self}" ))), } } diff --git a/native/spark-expr/src/predicate_funcs/is_nan.rs b/native/spark-expr/src/predicate_funcs/is_nan.rs index cce5e8f5f2..01da1f1e78 100644 --- a/native/spark-expr/src/predicate_funcs/is_nan.rs +++ b/native/spark-expr/src/predicate_funcs/is_nan.rs @@ -50,8 +50,7 @@ pub fn spark_isnan(args: &[ColumnarValue]) -> Result Err(DataFusionError::Internal(format!( - "Unsupported data type {:?} for function isnan", - other, + "Unsupported data type {other:?} for function isnan", ))), }, ColumnarValue::Scalar(a) => match a { diff --git a/native/spark-expr/src/predicate_funcs/rlike.rs b/native/spark-expr/src/predicate_funcs/rlike.rs index 179f58394c..0fac45d679 100644 --- a/native/spark-expr/src/predicate_funcs/rlike.rs +++ b/native/spark-expr/src/predicate_funcs/rlike.rs @@ -69,7 +69,7 @@ impl RLike { child, pattern_str: pattern.to_string(), pattern: Regex::new(pattern).map_err(|e| { - SparkError::Internal(format!("Failed to compile pattern {}: {}", pattern, e)) + SparkError::Internal(format!("Failed to compile pattern {pattern}: {e}")) })?, }) } diff --git a/native/spark-expr/src/struct_funcs/get_struct_field.rs b/native/spark-expr/src/struct_funcs/get_struct_field.rs index 6c0ef5ee33..c47211aef1 100644 --- a/native/spark-expr/src/struct_funcs/get_struct_field.rs +++ b/native/spark-expr/src/struct_funcs/get_struct_field.rs @@ -55,8 +55,7 @@ impl GetStructField { match self.child.data_type(input_schema)? { DataType::Struct(fields) => Ok(Arc::clone(&fields[self.ordinal])), data_type => Err(DataFusionError::Plan(format!( - "Expect struct field, got {:?}", - data_type + "Expect struct field, got {data_type:?}" ))), } } @@ -97,8 +96,7 @@ impl PhysicalExpr for GetStructField { Arc::clone(struct_array.column(self.ordinal)), )), value => Err(DataFusionError::Execution(format!( - "Expected a struct array, got {:?}", - value + "Expected a struct array, got {value:?}" ))), } } diff --git a/native/spark-expr/src/timezone.rs b/native/spark-expr/src/timezone.rs index 408deabef5..702ed236c6 100644 --- a/native/spark-expr/src/timezone.rs +++ b/native/spark-expr/src/timezone.rs @@ -41,8 +41,7 @@ fn parse_fixed_offset(tz: &str) -> Result { } Err(ArrowError::ParseError(format!( - "Invalid timezone \"{}\": Expected format [+-]XX:XX, [+-]XX, or [+-]XXXX", - tz + "Invalid timezone \"{tz}\": Expected format [+-]XX:XX, [+-]XX, or [+-]XXXX" ))) } @@ -83,7 +82,7 @@ impl FromStr for Tz { Ok(Self(TzInner::Offset(parse_fixed_offset(tz)?))) } else { Ok(Self(TzInner::Timezone(tz.parse().map_err(|e| { - ArrowError::ParseError(format!("Invalid timezone \"{}\": {}", tz, e)) + ArrowError::ParseError(format!("Invalid timezone \"{tz}\": {e}")) })?))) } } From 097dad3ed2efb0e0c9b75bf48a75a48b9dcc30cd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 13:46:47 -0600 Subject: [PATCH 06/11] format --- native/core/src/common/bit.rs | 5 +---- native/core/src/execution/shuffle/map.rs | 4 +--- .../core/src/execution/shuffle/shuffle_writer.rs | 7 ++++--- native/core/src/parquet/objectstore/s3.rs | 15 +++++---------- native/spark-expr/src/kernels/strings.rs | 4 +--- 5 files changed, 12 insertions(+), 23 deletions(-) diff --git a/native/core/src/common/bit.rs b/native/core/src/common/bit.rs index 354fc81621..9a444cb60c 100644 --- a/native/core/src/common/bit.rs +++ b/native/core/src/common/bit.rs @@ -1506,10 +1506,7 @@ mod tests { reader.get_u32_batch(batch.as_mut_ptr(), *total, num_bits); } for i in 0..batch.len() { - assert_eq!( - batch[i], values[i], - "num_bits = {num_bits}, index = {i}" - ); + assert_eq!(batch[i], values[i], "num_bits = {num_bits}, index = {i}"); } } } diff --git a/native/core/src/execution/shuffle/map.rs b/native/core/src/execution/shuffle/map.rs index 8bc03feb56..37df344d63 100644 --- a/native/core/src/execution/shuffle/map.rs +++ b/native/core/src/execution/shuffle/map.rs @@ -46,9 +46,7 @@ impl SparkUnsafeMap { } if key_array_size > i32::MAX as i64 { - panic!( - "Number of key size in bytes should <= i32::MAX: {key_array_size}" - ); + panic!("Number of key size in bytes should <= i32::MAX: {key_array_size}"); } let value_array_size = size - key_array_size as i32 - 8; diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 2287d33a18..d23d02a2c8 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -832,9 +832,10 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { offsets[num_output_partitions] = output_data.stream_position().map_err(to_df_err)?; let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); + let mut output_index = + BufWriter::new(File::create(index_file).map_err(|e| { + DataFusionError::Execution(format!("shuffle write error: {e:?}")) + })?); for offset in offsets { output_index .write_all(&(offset as i64).to_le_bytes()[..]) diff --git a/native/core/src/parquet/objectstore/s3.rs b/native/core/src/parquet/objectstore/s3.rs index b48210de5f..e93f99a87b 100644 --- a/native/core/src/parquet/objectstore/s3.rs +++ b/native/core/src/parquet/objectstore/s3.rs @@ -361,10 +361,7 @@ fn build_aws_credential_provider_metadata( AWS_WEB_IDENTITY_V1 | AWS_WEB_IDENTITY => Ok(CredentialProviderMetadata::WebIdentity), _ => Err(object_store::Error::Generic { store: "S3", - source: format!( - "Unsupported credential provider: {credential_provider_name}" - ) - .into(), + source: format!("Unsupported credential provider: {credential_provider_name}").into(), }), } } @@ -902,9 +899,8 @@ mod tests { let credential_provider_names = parse_credential_provider_names(HADOOP_ANONYMOUS); assert_eq!(credential_provider_names, vec![HADOOP_ANONYMOUS]); - let aws_credential_provider_names = format!( - "{HADOOP_ANONYMOUS},{AWS_ENVIRONMENT},{AWS_ENVIRONMENT_V1}" - ); + let aws_credential_provider_names = + format!("{HADOOP_ANONYMOUS},{AWS_ENVIRONMENT},{AWS_ENVIRONMENT_V1}"); let credential_provider_names = parse_credential_provider_names(&aws_credential_provider_names); assert_eq!( @@ -912,9 +908,8 @@ mod tests { vec![HADOOP_ANONYMOUS, AWS_ENVIRONMENT, AWS_ENVIRONMENT_V1] ); - let aws_credential_provider_names = format!( - " {HADOOP_ANONYMOUS}, {AWS_ENVIRONMENT},, {AWS_ENVIRONMENT_V1}," - ); + let aws_credential_provider_names = + format!(" {HADOOP_ANONYMOUS}, {AWS_ENVIRONMENT},, {AWS_ENVIRONMENT_V1},"); let credential_provider_names = parse_credential_provider_names(&aws_credential_provider_names); assert_eq!( diff --git a/native/spark-expr/src/kernels/strings.rs b/native/spark-expr/src/kernels/strings.rs index 59029a3862..f9e0b13117 100644 --- a/native/spark-expr/src/kernels/strings.rs +++ b/native/spark-expr/src/kernels/strings.rs @@ -44,9 +44,7 @@ pub fn string_space(length: &dyn Array) -> Result { let result = DictionaryArray::try_new(dict.keys().clone(), values)?; Ok(Arc::new(result)) } - dt => panic!( - "Unsupported input type for function 'string_space': {dt:?}" - ), + dt => panic!("Unsupported input type for function 'string_space': {dt:?}"), } } From a5cf22900bb4488114ee2bcfde0482c85f9f1246 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 26 Jun 2025 13:52:33 -0600 Subject: [PATCH 07/11] more --- native/hdfs/src/object_store/hdfs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/native/hdfs/src/object_store/hdfs.rs b/native/hdfs/src/object_store/hdfs.rs index a880af8716..0123782716 100644 --- a/native/hdfs/src/object_store/hdfs.rs +++ b/native/hdfs/src/object_store/hdfs.rs @@ -470,7 +470,7 @@ fn check_modified( if last_modified <= date { return Err(Error::NotModified { path: location.to_string(), - source: format!("{} >= {}", date, last_modified).into(), + source: format!("{date} >= {last_modified}").into(), }); } } @@ -479,7 +479,7 @@ fn check_modified( if last_modified > date { return Err(Error::Precondition { path: location.to_string(), - source: format!("{} < {}", date, last_modified).into(), + source: format!("{date} < {last_modified}").into(), }); } } From 17ddec6b12126ad1fa3a58c9c5161bdb1f1ddbbf Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Jun 2025 12:47:01 -0600 Subject: [PATCH 08/11] Update common/src/main/scala/org/apache/comet/CometConf.scala Co-authored-by: Oleks V --- common/src/main/scala/org/apache/comet/CometConf.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 4d172dc62d..f7ec9659bf 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -111,8 +111,7 @@ object CometConf extends ShimCometConf { .doc( "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + "respected when running the Spark SQL test suite but the default setting " + - "results in poor performance in Comet when using the new native scans, so we want " + - "to disable by default") + "results in poor performance in Comet when using the new native scans, disabled by default") .booleanConf .createWithDefault(false) From bcf4d88069232651df241feb2d48574a16c224b5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Jun 2025 12:47:07 -0600 Subject: [PATCH 09/11] Update docs/source/user-guide/configs.md Co-authored-by: Oleks V --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 9abd6d1ddb..6544909aae 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -84,7 +84,7 @@ Comet provides the following configuration settings. | spark.comet.parquet.read.io.mergeRanges.delta | The delta in bytes between consecutive read ranges below which the parallel reader will try to merge the ranges. The default is 8MB. | 8388608 | | spark.comet.parquet.read.parallel.io.enabled | Whether to enable Comet's parallel reader for Parquet files. The parallel reader reads ranges of consecutive data in a file in parallel. It is faster for large files and row groups but uses more resources. | true | | spark.comet.parquet.read.parallel.io.thread-pool.size | The maximum number of parallel threads the parallel reader will use in a single executor. For executors configured with a smaller number of cores, use a smaller number. | 16 | -| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, so we want to disable by default | false | +| spark.comet.parquet.respectFilterPushdown | Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be respected when running the Spark SQL test suite but the default setting results in poor performance in Comet when using the new native scans, disabled by default | false | | spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.allowIncompatible | Some Comet scan implementations are not currently fully compatible with Spark for all datatypes. Set this config to true to allow them anyway. For more information, refer to the Comet Compatibility Guide (https://datafusion.apache.org/comet/user-guide/compatibility.html). | false | | spark.comet.scan.enabled | Whether to enable native scans. When this is turned on, Spark will use Comet to read supported data sources (currently only Parquet is supported natively). Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | true | From 7d0a51f4d003c0c19c2c2092af74b0dd2894a1d8 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Jun 2025 12:48:21 -0600 Subject: [PATCH 10/11] address feedback --- .../comet/parquet/CometParquetPartitionReaderFactory.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala index a5aa0872cd..69cffdd15d 100644 --- a/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala +++ b/spark/src/main/scala/org/apache/comet/parquet/CometParquetPartitionReaderFactory.scala @@ -67,8 +67,7 @@ case class CometParquetPartitionReaderFactory( private val pushDownStringPredicate = sqlConf.parquetFilterPushDownStringPredicate private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead - private val parquetFilterPushDown = sqlConf.parquetFilterPushDown && - CometConf.COMET_RESPECT_PARQUET_FILTER_PUSHDOWN.get(sqlConf) + private val parquetFilterPushDown = sqlConf.parquetFilterPushDown // Comet specific configurations private val batchSize = CometConf.COMET_BATCH_SIZE.get(sqlConf) From e7b2a5850e48a9f479d107bf1e9b2109b4100368 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 27 Jun 2025 13:03:14 -0600 Subject: [PATCH 11/11] scalastyle --- common/src/main/scala/org/apache/comet/CometConf.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index f7ec9659bf..ee18fb9504 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -111,7 +111,8 @@ object CometConf extends ShimCometConf { .doc( "Whether to respect Spark's PARQUET_FILTER_PUSHDOWN_ENABLED config. This needs to be " + "respected when running the Spark SQL test suite but the default setting " + - "results in poor performance in Comet when using the new native scans, disabled by default") + "results in poor performance in Comet when using the new native scans, " + + "disabled by default") .booleanConf .createWithDefault(false)