Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ Comet provides the following configuration settings.
| spark.comet.nativeLoadRequired | Whether to require Comet native library to load successfully when Comet is enabled. If not, Comet will silently fallback to Spark when it fails to load the native lib. Otherwise, an error will be thrown and the Spark job will be aborted. | false |
| spark.comet.parquet.enable.directBuffer | Whether to use Java direct byte buffer when reading Parquet. By default, this is false | false |
| spark.comet.regexp.allowIncompatible | Comet is not currently fully compatible with Spark for all regular expressions. Set this config to true to allow them anyway using Rust's regular expression engine. See compatibility guide for more information. | false |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
| spark.comet.scan.enabled | Whether to enable Comet scan. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is true. | true |
| spark.comet.scan.preFetch.enabled | Whether to enable pre-fetching feature of CometScan. By default is disabled. | false |
| spark.comet.scan.preFetch.threadNum | The number of threads running pre-fetching for CometScan. Effective if spark.comet.scan.preFetch.enabled is enabled. By default it is 2. Note that more pre-fetching threads means more memory requirement to store pre-fetched row groups. | 2 |
| spark.comet.shuffle.preferDictionary.ratio | The ratio of total values to distinct values in a string column to decide whether to prefer dictionary encoding when shuffling the column. If the ratio is higher than this config, dictionary encoding will be used on shuffling string column. This config is effective if it is higher than 1.0. By default, this config is 10.0. Note that this config is only used when `spark.comet.exec.shuffle.mode` is `jvm`. | 10.0 |
| spark.comet.sparkToColumnar.supportedOperatorList | A comma-separated list of operators that will be converted to Comet columnar format when 'spark.comet.sparkToColumnar.enabled' is true | Range,InMemoryTableScan |
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Shall we use ` instead of '

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not changed by this PR. I think there is previous PR changing it, but didn't update the document.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The document is updated automatically when make release locally.

Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

case SortExec(sortOrder, _, child, _)
if isCometOperatorEnabled(op.conf, CometConf.OPERATOR_SORT) =>
// TODO: Remove this constraint when we upgrade to new arrow-rs including
// https://github.com/apache/arrow-rs/pull/6225
if (child.output.length == 1 && child.output.head.dataType.isInstanceOf[StructType]) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we add support for other types, do we need to update this to make it recursive so that we check for Map or Array containing struct?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add more data types here according to arrow-rs.

withInfo(op, "Sort on single struct column is not supported")
return None
}

val sortOrders = sortOrder.map(exprToProto(_, child.output))

if (sortOrders.forall(_.isDefined) && childOp.nonEmpty) {
Expand Down
20 changes: 20 additions & 0 deletions spark/src/test/scala/org/apache/comet/exec/CometExecSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,26 @@ class CometExecSuite extends CometTestBase {
}
}

test("Sort on single struct should fallback to Spark") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val data =
Seq(Tuple1(null), Tuple1((1, "a")), Tuple1((2, null)), Tuple1((3, "b")), Tuple1(null))

withParquetFile(data) { file =>
readParquetFile(file) { df =>
val sort = df.sort("_1")
checkSparkAnswer(sort)
}
}
}
}

test("Native window operator should be CometUnaryExec") {
withTempView("testData") {
sql("""
Expand Down