Skip to content

Experiment: Coalesce batches after scan#496

Closed
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:coalesce-batches
Closed

Experiment: Coalesce batches after scan#496
andygrove wants to merge 4 commits intoapache:mainfrom
andygrove:coalesce-batches

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Part of #495

Rationale for this change

Determine if coalescing small batches improves performance.

What changes are included in this PR?

Coalesce batches after file scan + projection

How are these changes tested?

I ran manual benchmarks with q14.

before

    "14": [
        8.631258964538574,
        6.470786809921265,
        6.360034465789795,
        6.075344562530518,
        6.00908350944519,
        6.089619159698486,
        5.928948163986206,
        6.05573034286499,
        6.084570407867432,
        5.822378635406494
    ]

after

    "14": [
        8.460940837860107,
        6.0459511280059814,
        6.053459644317627,
        5.822834014892578,
        5.788251161575317,
        5.765811204910278,
        6.015196084976196,
        5.922304391860962,
        5.736841917037964,
        5.718552589416504
    ]

This shows a 4% improvement of the median result, so may not be conclusive. Needs more testing.

@andygrove
Copy link
Copy Markdown
Member Author

The change in this PR worked fine with q14 but caused this failure with q1:

py4j.protocol.Py4JJavaError: An error occurred while calling o200.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 161 in stage 8.0 failed 4 times, most recent failure: Lost task 161.3 in stage 8.0 (TID 195) (192.168.86.32 executor 0): org.apache.comet.CometNativeException: General execution error with reason org.apache.comet.CometNativeException: called `Option::unwrap()` on a `None` value
        at comet::errors::init::{{closure}}(__internal__:0)
        at std::panicking::rust_panic_with_hook(__internal__:0)
        at std::panicking::begin_panic_handler::{{closure}}(__internal__:0)
        at std::sys_common::backtrace::__rust_end_short_backtrace(__internal__:0)
        at rust_begin_unwind(__internal__:0)
        at core::panicking::panic_fmt(__internal__:0)
        at core::panicking::panic(__internal__:0)
        at arrow_row::row_lengths(__internal__:0)
        at arrow_row::RowConverter::append(__internal__:0)
        at arrow_row::RowConverter::convert_columns(__internal__:0)
        at core::iter::adapters::map::map_try_fold::{{closure}}(__internal__:0)
        at arrow_row::RowConverter::append(__internal__:0)
        at arrow_row::RowConverter::convert_columns(__internal__:0)
        at <datafusion_physical_plan::aggregates::group_values::row::GroupValuesRows as datafusion_physical_plan::aggregates::group_values::GroupValues>::intern(__internal__:0)
        at <datafusion_physical_plan::aggregates::row_hash::GroupedHashAggregateStream as futures_core::stream::Stream>::poll_next(__internal__:0)
        at comet::execution::jni_api::Java_org_apache_comet_Native_executePlan::{{closure}}(__internal__:0)

I am surprised that having the projection produce larger batches would cause something like this, but I am not yet familiar with the scala/rust interactions about this, so there is probably something that I am not understanding.

@viirya @sunchao Can you help explain why my change is causing issues?

@viirya
Copy link
Copy Markdown
Member

viirya commented Jun 4, 2024

I've checked the implementation of CoalesceBatchesExec. It actually buffers produced batches from its upstream. Comet scan reuses vectors when producing batches, so when you poll next batch from Comet scan, the previous batch's content is overwritten. To do with it, you probably need to add a Comet CopyExec operator before CoalesceBatchesExec.

@andygrove
Copy link
Copy Markdown
Member Author

I ran benchmarks with this change and see no improvement, so closing this as a failed experiement.

@andygrove andygrove closed this Jun 5, 2024
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

N/A

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Apply OSS 0.3.0 changes.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

```
84cccf7 docs: Add notes for IntelliJ code size limits for code inspections. (apache#985)
dcc4a8a fix: The spilled_bytes metric of CometSortExec should be size instead of time (apache#984)
f64553b chore: fix compatibility guide (apache#978)
0ee7df8 chore: Enable additional CreateArray tests (apache#928)
a690e9d perf: Remove one redundant CopyExec for SMJ (apache#962)
a8156b5 chore: update rem expression guide (apache#976)
317a534 fix: Use the number of rows from underlying arrays instead of logical row count from RecordBatch (apache#972)
22561c4 doc: add documentation interlinks (apache#975)
b4de8e0 chore: Update benchmarks results based on 0.3.0-rc1 (apache#969)
94093f3 chore: fix publish-to-maven script (apache#966)
f31f6cc Generate changelog for 0.3.0 release (apache#964)
5663fc2 fix: div and rem by negative zero (apache#960)
50517f6 perf: Optimize decimal precision check in decimal aggregates (sum and avg) (apache#952)
5b3f7bc fix: CometScanExec on Spark 3.5.2 (apache#915)
8410c71 chore: clarify tarball installation (apache#959)
459b2b0 fix: window function range offset should be long instead of int (apache#733)
```


## How are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants