Skip to content

Conversation

@alamb
Copy link
Contributor

@alamb alamb commented Apr 7, 2021

Rationale

Once the number of rows needed for a limit query has been produced, any further work done to read values from its input is wasted.

The current implementation of LimitStream will keep polling its input for the next value, and returning Poll::Ready(None) , even once the limit has been reached

For queries like select * from foo limit 10 used for initial data exploration this is very wasteful.

Changes

This PR changes LimitStream so that it drops its input once the limit has been reached -- this both potentially frees resources (memory, file handles, etc) it also avoids unnecessary computation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual code change for this PR is very small -- the rest of the changes are related to writing a proper test for it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a vague memory that FusedStream may have something to do with this property (although /noideadog)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

One benefit of the this PR over fuse() is that this PR will actually drop the input stream (freeing resources) in addition to not calling the input stream again: https://docs.rs/futures-util/0.3.13/src/futures_util/stream/stream/fuse.rs.html#10

@alamb alamb force-pushed the alamb/limit_less_run branch 2 times, most recently from f2632af to 99505b4 Compare April 7, 2021 13:33
Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. This should have quite an impact on some of our benchmarks I imagine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just compare self.current_len == self.limit and short-cirtcuit before polling the wrapped stream, instead of the Option plumbing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was that the option plumbing actually drops the input, freeing its resources when the limit has been hit, rather than waiting for the execution to be complete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we do this in other places too? Isn't a SendableRecordBatchStream a small struct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a SendableRecordBatchStream a small struct?

It is a trait, so there are various things that implement it. Some, like the ParquetStream

Ok(Box::pin(ParquetStream {
, could have substantial resources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand it now, it can consist of the whole tree of dependent streams. Probably still not a big resource hog but more than a few bytes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine it is actually a subquery with a group by hash or join with a large hash table :) It may actually be hanging on to a substantial amount of memory I suspect

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah that's right!

@tustvold
Copy link
Contributor

tustvold commented Apr 7, 2021

So I can understand from the code it polling the inner stream once before discarding the results of this and returning None, but at this point it should not be polled again. According to the stream trait

Once a stream has finished (returned Ready(None) from poll_next), calling its poll_next method again may panic, block forever, or cause other kinds of problems; the Stream trait places no requirements on the effects of such a call. However, as the poll_next method is not marked unsafe, Rust's usual rules apply: calls must never cause undefined behavior (memory corruption, incorrect use of unsafe functions, or the like), regardless of the stream's state.

If this is difficult to guard against then the fuse adapter can be used to ensure that poll_next always returns Ready(None) in subsequent calls.

If this is occurring, there may be a more fundamental issue at play here that also needs fixing

@alamb
Copy link
Contributor Author

alamb commented Apr 7, 2021

If this is occurring, there may be a more fundamental issue at play here that also needs fixing

@tustvold my initial reading of LimitStream also suggested to me that input.poll_next would only be called once after the limit had been hit.

The limit_early_shutdown shows this bug in fact (it fails after consuming one more input than it should, without the changes in this PR).

I had a test (99505b4#diff-34dec6459ccea51c881a6ea392be9ad35f112395e6b8742df32a1742ac651e31L1799) that ran an entire query and I interpreted some println! output as showing that all the inputs were consumed (aka that poll_next() was repeatedly called). I did not debug it further.

I am confident that this is an improvement to LimitStream -- there may well be some more fundamental issue that can also be fixed / improved as subsequent PRs

Copy link
Contributor

@Dandandan Dandandan Apr 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? It can be based on self.current_len == self.limit or otherwise a boolean like limit_exhausted?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha 👍 😆

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a SendableRecordBatchStream a small struct?

It is a trait, so there are various things that implement it. Some, like the ParquetStream

Ok(Box::pin(ParquetStream {
, could have substantial resources

@github-actions
Copy link

github-actions bot commented Apr 7, 2021

@codecov-io
Copy link

codecov-io commented Apr 7, 2021

Codecov Report

Merging #9926 (ad7c712) into master (81f6521) will increase coverage by 0.04%.
The diff coverage is 88.50%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #9926      +/-   ##
==========================================
+ Coverage   82.70%   82.75%   +0.04%     
==========================================
  Files         257      258       +1     
  Lines       60486    60620     +134     
==========================================
+ Hits        50027    50167     +140     
+ Misses      10459    10453       -6     
Impacted Files Coverage Δ
rust/benchmarks/src/bin/tpch.rs 38.33% <0.00%> (ø)
rust/datafusion/src/physical_plan/hash_join.rs 83.60% <75.00%> (-1.00%) ⬇️
rust/datafusion/src/test/exec.rs 85.00% <85.00%> (ø)
rust/datafusion/src/physical_plan/planner.rs 80.43% <89.47%> (+0.50%) ⬆️
rust/datafusion/src/execution/context.rs 92.93% <94.73%> (-0.10%) ⬇️
rust/datafusion/src/physical_plan/limit.rs 79.82% <96.66%> (+23.15%) ⬆️
rust/datafusion/src/execution/dataframe_impl.rs 88.88% <100.00%> (-0.22%) ⬇️
rust/datafusion/src/logical_plan/builder.rs 88.88% <100.00%> (+0.70%) ⬆️
rust/datafusion/src/physical_plan/repartition.rs 82.91% <100.00%> (+1.69%) ⬆️
rust/datafusion/src/test/mod.rs 100.00% <100.00%> (ø)
... and 6 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 6b67b57...ad7c712. Read the comment docs.

@alamb alamb force-pushed the alamb/limit_less_run branch from 99505b4 to ad7c712 Compare April 7, 2021 21:59
@alamb alamb marked this pull request as ready for review April 7, 2021 21:59
@alamb
Copy link
Contributor Author

alamb commented Apr 8, 2021

Any last thoughts on this PR?

Copy link
Contributor

@Dandandan Dandandan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! (Small linting error)

@alamb alamb force-pushed the alamb/limit_less_run branch from d7cc9ca to b923fdd Compare April 8, 2021 22:00
@alamb
Copy link
Contributor Author

alamb commented Apr 8, 2021

Looks good! (Small linting error)

Thanks @Dandandan -- I fixed that up so hopefully this will be good to go tomorrow as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants