Skip to content

chore: add assertion that not using comet scan but using native scan#1793

Closed
rluvaton wants to merge 2 commits intoapache:mainfrom
rluvaton:add-assertion-for-not-using-datafusion-scan
Closed

chore: add assertion that not using comet scan but using native scan#1793
rluvaton wants to merge 2 commits intoapache:mainfrom
rluvaton:add-assertion-for-not-using-datafusion-scan

Conversation

@rluvaton
Copy link
Copy Markdown
Member

Which issue does this PR close?

N/A

Rationale for this change

Just add assertion that using datafusion scan when using datafusion filter making sure the code in #1746 is always correct

What changes are included in this PR?

Just assertion

How are these changes tested?

Existing tests

@rluvaton
Copy link
Copy Markdown
Member Author

rluvaton commented May 27, 2025

There are a lot of failures, I think I found the error, pushing a fix, maybe this is the reason for:

plan match {
case w: CometScanWrapper => containsNativeCometScan(w.originalPlan)
case scan: CometScanExec => scan.scanImpl == CometConf.SCAN_NATIVE_COMET
case _: CometScanWrapper => true
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it to return true as native scan exec is not wrapped in CometScanWrapper

Copy link
Copy Markdown
Member Author

@rluvaton rluvaton May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and when not scan datafusion so it will not use df filter when compat iceberg

case scan: CometScanExec => scan.scanImpl != CometConf.SCAN_NATIVE_DATAFUSION
case _: CometNativeScanExec => false
case _ => plan.children.exists(containsNativeCometScan)
case _ => plan.children.isEmpty || plan.children.exists(containsNonDataFusionScan)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exists return false for empty children as it means that this is the data source, right? and if so it using the ScanExec of rust and not ScanExec of DataFusion

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScanExec of rust and not ScanExec of DataFusion

what does this mean?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must start from a scan, right? either ScanExec or DataFusion DataSourceExec so if no children this means that we are at the root, for example ExistingRDD and therefore we must use ScanExec as otherwise we would have converted to CometNativeScanExec

Comment on lines +959 to +964
assert_eq!(
scans.len(),
0,
"Must not use ScanExec as it may requires a copy"
);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that we only reuse buffers when using native_comet to read the Parquet files. If native_iceberg_compat is used then we will still have a ScanExec reading from JVM but it is safe to use DataFusion's FilterExec in this case.

@parthchandra would you agree with this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. native_iceberg_compat does not reuse buffers.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 27, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 59.46%. Comparing base (f09f8af) to head (cceb32a).
⚠️ Report is 1164 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1793      +/-   ##
============================================
+ Coverage     56.12%   59.46%   +3.33%     
- Complexity      976     1140     +164     
============================================
  Files           119      128       +9     
  Lines         11743    12528     +785     
  Branches       2251     2356     +105     
============================================
+ Hits           6591     7450     +859     
+ Misses         4012     3887     -125     
- Partials       1140     1191      +51     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Copy Markdown
Member

Thanks @rluvaton but this PR does not appear to help with the CI issue (the tests are still failing - see https://github.com/apache/datafusion-comet/actions/runs/15278587551/job/42973846520?pr=1793).

The scans in Comet are pretty confusing. In the native plan we either have DataFusion's DataSourceExec, or a Comet ScanExec that reads data from the JVM. The source of data for ScanExec could be a broadcast or shuffle exchange, or a Parquet scan either using native_comet or native_iceberg_compat. When using native_comet there is buffer re-use and that is when we need to use Comet's FilterExec.

@rluvaton
Copy link
Copy Markdown
Member Author

rluvaton commented May 28, 2025

I thought everything that came from JVM is reusing buffers, if it's not the case than the copy should not always be added when there is a ScanExec, no? (looking at the wrap in copy exec code)

@andygrove
Copy link
Copy Markdown
Member

I thought everything that came from JVM is reusing buffers, if it's not the case than the copy should not always be added when there is a ScanExec, no? (looking at the wrap in copy exec code)

That's a good point. We currently add CopyExec for all ScanExecs, regardless of the source. This could be optimized.

@rluvaton
Copy link
Copy Markdown
Member Author

I still think there is a bug here:

For this test (when running on main):

test("debug datafusion native filter") {
  val schema = StructType(
    Seq(
      StructField("row_idx", IntegerType, nullable = false),
      StructField("int", IntegerType, nullable = false)))

  val data = DataGenerator.DEFAULT.generateRows(1000, schema)

  withSQLConf(
    CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true",
    CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true",
    CometConf.COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.key -> "RDDScan") {
    val df = spark
      .createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
      .where(col("row_idx") < 10000 || col("row_idx") > 10010)

    df.explain(true)
    df
      .show()
  }
}

The spark plan is:

== Parsed Logical Plan ==
'Filter (('row_idx < 10000) OR ('row_idx > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Analyzed Logical Plan ==
row_idx: int, int: int
Filter ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Optimized Logical Plan ==
Filter ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Physical Plan ==
*(2) CometColumnarToRow
+- CometFilter [row_idx#2, int#3], ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
   +- CometSparkRowToColumnar
      +- *(1) Scan ExistingRDD[row_idx#2,int#3]

and the datafusion plan is:

25/05/28 19:17:14 INFO core/src/execution/jni_api.rs: Comet native query plan:
FilterExec: col_0@0 < 10000 OR col_0@0 > 10010
  ScanExec: source=[CometSparkRowToColumnar (unknown)], schema=[col_0: Int32, col_1: Int32]

It is using DataFusion Filter and not CometFilter while it should use comet filter as there is reuse, no?

@andygrove
Copy link
Copy Markdown
Member

I still think there is a bug here:

For this test (when running on main):

test("debug datafusion native filter") {
  val schema = StructType(
    Seq(
      StructField("row_idx", IntegerType, nullable = false),
      StructField("int", IntegerType, nullable = false)))

  val data = DataGenerator.DEFAULT.generateRows(1000, schema)

  withSQLConf(
    CometConf.COMET_EXPLAIN_VERBOSE_ENABLED.key -> "true",
    CometConf.COMET_EXPLAIN_NATIVE_ENABLED.key -> "true",
    CometConf.COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST.key -> "RDDScan") {
    val df = spark
      .createDataFrame(spark.sparkContext.parallelize(data, 1), schema)
      .where(col("row_idx") < 10000 || col("row_idx") > 10010)

    df.explain(true)
    df
      .show()
  }
}

The spark plan is:

== Parsed Logical Plan ==
'Filter (('row_idx < 10000) OR ('row_idx > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Analyzed Logical Plan ==
row_idx: int, int: int
Filter ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Optimized Logical Plan ==
Filter ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
+- LogicalRDD [row_idx#2, int#3], false

== Physical Plan ==
*(2) CometColumnarToRow
+- CometFilter [row_idx#2, int#3], ((row_idx#2 < 10000) OR (row_idx#2 > 10010))
   +- CometSparkRowToColumnar
      +- *(1) Scan ExistingRDD[row_idx#2,int#3]

and the datafusion plan is:

25/05/28 19:17:14 INFO core/src/execution/jni_api.rs: Comet native query plan:
FilterExec: col_0@0 < 10000 OR col_0@0 > 10010
  ScanExec: source=[CometSparkRowToColumnar (unknown)], schema=[col_0: Int32, col_1: Int32]

It is using DataFusion Filter and not CometFilter while it should use comet filter as there is reuse, no?

In this example, Spark (not Comet) is performing the scan. Comet is then performing the row-to-columnar conversion. The native_comet scan is not being used so there is no need to use Comet's filter.

@rluvaton
Copy link
Copy Markdown
Member Author

Now I'm really confused, where in the code there is reuse of buffers?

@rluvaton
Copy link
Copy Markdown
Member Author

Also, isn't reuse of buffer only part of the reason to do copy? like if the spark side call the arrow release function in the FFI while the Array is still being used for example in aggregations

@andygrove
Copy link
Copy Markdown
Member

Now I'm really confused, where in the code there is reuse of buffers?

The key structs are MutableBuffer and ParquetMutableBuffer.

@rluvaton rluvaton closed this Jun 4, 2025
@rluvaton rluvaton deleted the add-assertion-for-not-using-datafusion-scan branch June 4, 2025 10:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants